Module: Fluent::PluginHelper::Inject

Defined in:
lib/fluent/plugin_helper/inject.rb

Defined Under Namespace

Modules: InjectParams

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(mod) ⇒ Object



87
88
89
# File 'lib/fluent/plugin_helper/inject.rb', line 87

def self.included(mod)
  mod.include InjectParams
end

Instance Method Details

#configure(conf) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/fluent/plugin_helper/inject.rb', line 103

def configure(conf)
  super

  if @inject_config
    @_inject_hostname_key = @inject_config.hostname_key
    if @_inject_hostname_key
      if self.respond_to?(:buffer_config)
        # Output plugin cannot use "hostname"(specified by @hostname_key),
        # injected by this plugin helper, in chunk keys.
        # This plugin helper works in `#format` (in many cases), but modified record
        # don't have any side effect in chunking of output plugin.
        if self.buffer_config.chunk_keys.include?(@_inject_hostname_key)
          log.error "Use filters to inject hostname to use it in buffer chunking."
          raise Fluent::ConfigError, "the key specified by 'hostname_key' in <inject> cannot be used in buffering chunk key."
        end
      end

      @_inject_hostname =  @inject_config.hostname
      unless @_inject_hostname
        @_inject_hostname = ::Socket.gethostname
        log.info "using hostname for specified field", host_key: @_inject_hostname_key, host_name: @_inject_hostname
      end
    end
    @_inject_worker_id_key = @inject_config.worker_id_key
    if @_inject_worker_id_key
      @_inject_worker_id = fluentd_worker_id # get id here, because #with_worker_config method may be used only for #configure in tests
    end
    @_inject_tag_key = @inject_config.tag_key
    @_inject_time_key = @inject_config.time_key
    if @_inject_time_key
      @_inject_time_formatter = case @inject_config.time_type
                                when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value
                                when :unixtime_millis then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000 + time.nsec / 1_000_000 : (time * 1_000).floor }
                                when :unixtime_micros then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000_000 + time.nsec / 1_000 : (time * 1_000_000).floor }
                                when :unixtime_nanos then ->(time) { time.respond_to?(:nsec) ? time.to_i * 1_000_000_000 + time.nsec : (time * 1_000_000_000).floor }
                                when :unixtime then ->(time){ time.to_i }
                                else
                                  localtime = @inject_config.localtime && !@inject_config.utc
                                  Fluent::TimeFormatter.new(@inject_config.time_format, localtime, @inject_config.timezone)
                                end
    else
      if @inject_config.time_format
        log.warn "'time_format' specified without 'time_key', will be ignored"
      end
    end

    @_inject_enabled = @_inject_hostname_key || @_inject_worker_id_key || @_inject_tag_key || @_inject_time_key
  end
end

#initializeObject



91
92
93
94
95
96
97
98
99
100
101
# File 'lib/fluent/plugin_helper/inject.rb', line 91

def initialize
  super
  @_inject_enabled = false
  @_inject_hostname_key = nil
  @_inject_hostname = nil
  @_inject_worker_id_key = nil
  @_inject_worker_id = nil
  @_inject_tag_key = nil
  @_inject_time_key = nil
  @_inject_time_formatter = nil
end

#inject_values_to_event_stream(tag, es) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin_helper/inject.rb', line 45

def inject_values_to_event_stream(tag, es)
  return es unless @_inject_enabled

  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    r = record.dup
    if @_inject_hostname_key
      r[@_inject_hostname_key] = @_inject_hostname
    end
    if @_inject_worker_id_key
      r[@_inject_worker_id_key] = @_inject_worker_id
    end
    if @_inject_tag_key
      r[@_inject_tag_key] = tag
    end
    if @_inject_time_key
      r[@_inject_time_key] = @_inject_time_formatter.call(time)
    end
    new_es.add(time, r)
  end

  new_es
end

#inject_values_to_record(tag, time, record) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/fluent/plugin_helper/inject.rb', line 25

def inject_values_to_record(tag, time, record)
  return record unless @_inject_enabled

  r = record.dup
  if @_inject_hostname_key
    r[@_inject_hostname_key] = @_inject_hostname
  end
  if @_inject_worker_id_key
    r[@_inject_worker_id_key] = @_inject_worker_id
  end
  if @_inject_tag_key
    r[@_inject_tag_key] = tag
  end
  if @_inject_time_key
    r[@_inject_time_key] = @_inject_time_formatter.call(time)
  end

  r
end