Class: Fluent::Plugin::ElasticsearchInput

Inherits:
Input
  • Object
show all
Includes:
ElasticsearchConstants
Defined in:
lib/fluent/plugin/in_elasticsearch.rb

Defined Under Namespace

Classes: UnrecoverableRequestFailure

Constant Summary collapse

DEFAULT_RELOAD_AFTER =
-1
DEFAULT_STORAGE_TYPE =
'local'
METADATA =
"@metadata".freeze

Constants included from ElasticsearchConstants

Fluent::Plugin::ElasticsearchConstants::BODY_DELIMITER, Fluent::Plugin::ElasticsearchConstants::CREATE_OP, Fluent::Plugin::ElasticsearchConstants::ID_FIELD, Fluent::Plugin::ElasticsearchConstants::INDEX_OP, Fluent::Plugin::ElasticsearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::ElasticsearchConstants::UPDATE_OP, Fluent::Plugin::ElasticsearchConstants::UPSERT_OP

Instance Method Summary collapse

Constructor Details

#initializeElasticsearchInput

Returns a new instance of ElasticsearchInput.



58
59
60
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 58

def initialize
  super
end

Instance Method Details

#backend_optionsObject



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 100

def backend_options
  case @http_backend
  when :excon
    { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
  when :typhoeus
    require 'typhoeus'
    { sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
  end
rescue LoadError => ex
  log.error_backtrace(ex.backtrace)
  raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end

#client(host = nil) ⇒ Object



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 205

def client(host = nil)
  # check here to see if we already have a client connection for the given host
  connection_options = get_connection_options(host)

  @_es = nil unless is_existing_connection(connection_options[:hosts])

  @_es ||= begin
    @current_config = connection_options[:hosts].clone
    adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
    local_reload_connections = @reload_connections
    if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
      local_reload_connections = @reload_after
    end

    headers = { 'Content-Type' => "application/json" }.merge(@custom_headers)

    transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(
      connection_options.merge(
        options: {
          reload_connections: local_reload_connections,
          reload_on_failure: @reload_on_failure,
          resurrect_after: @resurrect_after,
          logger: @transport_logger,
          transport_options: {
            headers: headers,
            request: { timeout: @request_timeout },
            ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
          },
          http: {
            user: @user,
            password: @password
          },
          sniffer_class: @sniffer_class,
        }), &adapter_conf)
    Elasticsearch::Client.new transport: transport
  end
end

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 62

def configure(conf)
  super

  @timestamp_parser = create_time_parser
  @backend_options = backend_options

  raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?

  if @user && m = @user.match(/%{(?<user>.*)}/)
    @user = URI.encode_www_form_component(m["user"])
  end
  if @password && m = @password.match(/%{(?<password>.*)}/)
    @password = URI.encode_www_form_component(m["password"])
  end

  @transport_logger = nil
  if @with_transporter_log
    @transport_logger = log
    log_level = conf['@log_level'] || conf['log_level']
    log.warn "Consider to specify log_level with @log_level." unless log_level
  end
  @current_config = nil
  # Specify @sniffer_class before calling #client.
  @sniffer_class = nil
  begin
    @sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
  rescue Exception => ex
    raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
  end

  @options = {
    :index => @index_name,
    :scroll => @scroll,
    :size => @size
  }
  @base_query = @query
end

#convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z") ⇒ Object



193
194
195
196
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 193

def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z")
  numeric_time_parser = Fluent::NumericTimeParser.new(:float)
  Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(timestamp_key_format)
end

#create_time_parserObject

once fluent v0.14 is released we might be able to use Fluent::Parser::TimeParser, but it doesn’t quite do what we want - if gives

sec,nsec

where as we want something we can call ‘strftime` on…



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 166

def create_time_parser
  if @timestamp_key_format
    begin
      # Strptime doesn't support all formats, but for those it does it's
      # blazingly fast.
      strptime = Strptime.new(@timestamp_key_format)
      Proc.new { |value|
        value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
        strptime.exec(value).to_time
      }
    rescue
      # Can happen if Strptime doesn't recognize the format; or
      # if strptime couldn't be required (because it's not installed -- it's
      # ruby 2 only)
      Proc.new { |value|
        value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
        DateTime.strptime(value, @timestamp_key_format).to_time
      }
    end
  else
    Proc.new { |value|
      value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
      DateTime.parse(value).to_time
    }
  end
end

#get_connection_options(con_host = nil) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 125

def get_connection_options(con_host=nil)

  hosts = if con_host || @hosts
    (con_host || @hosts).split(',').map do |host_str|
      # Support legacy hosts format host:port,host:port,host:port...
      if host_str.match(%r{^[^:]+(\:\d+)?$})
        {
          host:   host_str.split(':')[0],
          port:   (host_str.split(':')[1] || @port).to_i,
          scheme: @scheme.to_s
        }
      else
        # New hosts format expects URLs such as http://logs.foo.com,https://john:[email protected]/elastic
        uri = URI(get_escaped_userinfo(host_str))
        %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
          hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
          hash
        end
      end
    end.compact
  else
    [{host: @host, port: @port, scheme: @scheme.to_s}]
  end.each do |host|
    host.merge!(user: @user, password: @password) if !host[:user] && @user
    host.merge!(path: @path) if !host[:path] && @path
  end

  {
    hosts: hosts
  }
end

#get_escaped_userinfo(host_str) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 113

def get_escaped_userinfo(host_str)
  if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
    m["scheme"] +
      URI.encode_www_form_component(m["user"]) +
      ':' +
      URI.encode_www_form_component(m["password"]) +
      m["path"]
  else
    host_str
  end
end

#is_existing_connection(host) ⇒ Object



243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 243

def is_existing_connection(host)
  # check if the host provided match the current connection
  return false if @_es.nil?
  return false if @current_config.nil?
  return false if host.length != @current_config.length

  for i in 0...host.length
    if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
      return false
    end
  end

  return true
end

#parse_time(value, event_time, tag) ⇒ Object



198
199
200
201
202
203
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 198

def parse_time(value, event_time, tag)
  @timestamp_parser.call(value)
rescue => e
  router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
  return Time.at(event_time).to_time
end

#process_events(hit, es) ⇒ Object



300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 300

def process_events(hit, es)
  event = hit["_source"]
  time = Fluent::Engine.now
  if @parse_timestamp
    if event.has_key?(TIMESTAMP_FIELD)
      rts = event[TIMESTAMP_FIELD]
      time = parse_time(rts, time, @tag)
    end
  end
  if @docinfo
    docinfo_target = event[@docinfo_target] || {}

    unless docinfo_target.is_a?(Hash)
      raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
    end

    @docinfo_fields.each do |field|
      docinfo_target[field] = hit[field]
    end

    event[@docinfo_target] = docinfo_target
  end
  es.add(time, event)
end

#process_next_scroll_request(es, scroll_id) ⇒ Object



294
295
296
297
298
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 294

def process_next_scroll_request(es, scroll_id)
  result = process_scroll_request(scroll_id)
  result['hits']['hits'].each { |hit| process_events(hit, es) }
  {'has_hits' => result['hits']['hits'].any?, '_scroll_id' => result['_scroll_id']}
end

#process_scroll_request(scroll_id) ⇒ Object



290
291
292
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 290

def process_scroll_request(scroll_id)
  client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end

#runObject



258
259
260
261
262
263
264
265
266
267
268
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 258

def run
  return run_slice if @num_slices <= 1

  log.warn("Large slice number is specified:(#{@num_slices}). Consider reducing num_slices") if @num_slices > 8

  @num_slices.times.map do |slice_id|
    thread_create(:"in_elasticsearch_thread_#{slice_id}") do
      run_slice(slice_id)
    end
  end
end

#run_slice(slice_id = nil) ⇒ Object



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 270

def run_slice(slice_id=nil)
  slice_query = @base_query
  slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
  result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
  es = Fluent::MultiEventStream.new

  result["hits"]["hits"].each {|hit| process_events(hit, es)}
  has_hits = result['hits']['hits'].any?
  scroll_id = result['_scroll_id']

  while has_hits && scroll_id
    result = process_next_scroll_request(es, scroll_id)
    has_hits = result['has_hits']
    scroll_id = result['_scroll_id']
  end

  router.emit_stream(@tag, es)
  client.clear_scroll(scroll_id: scroll_id) if scroll_id
end

#startObject



157
158
159
160
161
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 157

def start
  super

  timer_execute(:in_elasticsearch_timer, @interval, repeat: @repeat, &method(:run))
end