Class: Fluent::Plugin::OpenSearchInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::OpenSearchInput
show all
- Includes:
- OpenSearchConstants
- Defined in:
- lib/fluent/plugin/in_opensearch.rb
Defined Under Namespace
Classes: UnrecoverableRequestFailure
Constant Summary
collapse
- DEFAULT_RELOAD_AFTER =
-1
- DEFAULT_STORAGE_TYPE =
'local'
- METADATA =
"@metadata".freeze
Fluent::Plugin::OpenSearchConstants::BODY_DELIMITER, Fluent::Plugin::OpenSearchConstants::CREATE_OP, Fluent::Plugin::OpenSearchConstants::ID_FIELD, Fluent::Plugin::OpenSearchConstants::INDEX_OP, Fluent::Plugin::OpenSearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::OpenSearchConstants::UPDATE_OP, Fluent::Plugin::OpenSearchConstants::UPSERT_OP
Instance Method Summary
collapse
Constructor Details
Returns a new instance of OpenSearchInput.
104
105
106
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 104
def initialize
super
end
|
Instance Method Details
#backend_options ⇒ Object
147
148
149
150
151
152
153
154
155
156
157
158
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 147
def backend_options
case @http_backend
when :excon
{ client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
when :typhoeus
require 'typhoeus'
{ sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
end
rescue LoadError => ex
log.error_backtrace(ex.backtrace)
raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end
|
399
400
401
402
403
404
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 399
def clear_scroll(scroll_id)
client.clear_scroll(scroll_id: scroll_id) if scroll_id
rescue => e
log.warn("Ignoring clear_scroll exception", message: e.message, exception: e.class)
end
|
#client(host = nil) ⇒ Object
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 287
def client(host = nil)
connection_options = get_connection_options(host)
@_os = nil unless is_existing_connection(connection_options[:hosts])
@_os ||= begin
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
local_reload_connections = @reload_after
end
= { 'Content-Type' => "application/json" }.merge(@custom_headers)
transport = OpenSearch::Transport::Transport::HTTP::Faraday.new(
connection_options.merge(
options: {
reload_connections: local_reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
logger: @transport_logger,
transport_options: {
headers: ,
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
},
sniffer_class: @sniffer_class,
}), &adapter_conf)
OpenSearch::Client.new transport: transport
end
end
|
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 108
def configure(conf)
super
@timestamp_parser = create_time_parser
@backend_options = backend_options
@retry = nil
raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?
if @user && m = @user.match(/%{(?<user>.*)}/)
@user = URI.encode_www_form_component(m["user"])
end
if @password && m = @password.match(/%{(?<password>.*)}/)
@password = URI.encode_www_form_component(m["password"])
end
@transport_logger = nil
if @with_transporter_log
@transport_logger = log
log_level = conf['@log_level'] || conf['log_level']
log.warn "Consider to specify log_level with @log_level." unless log_level
end
@current_config = nil
@sniffer_class = nil
begin
@sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
rescue Exception => ex
raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
end
@options = {
:index => @index_name,
:scroll => @scroll,
:size => @size
}
@base_query = @query
end
|
#convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z") ⇒ Object
273
274
275
276
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 273
def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z")
numeric_time_parser = Fluent::NumericTimeParser.new(:float)
Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(timestamp_key_format)
end
|
#create_time_parser ⇒ Object
We might be able to use Fluent::Parser::TimeParser, but it doesn’t quite do what we want - if gives
- sec,nsec
-
where as we want something we can call ‘strftime` on…
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 246
def create_time_parser
if @timestamp_key_format
begin
strptime = Strptime.new(@timestamp_key_format)
Proc.new { |value|
value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
strptime.exec(value).to_time
}
rescue
Proc.new { |value|
value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
DateTime.strptime(value, @timestamp_key_format).to_time
}
end
else
Proc.new { |value|
value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
DateTime.parse(value).to_time
}
end
end
|
#emit_error_label_event(&block) ⇒ Object
230
231
232
233
234
235
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 230
def emit_error_label_event(&block)
if emit_error_label_event
block.call
end
end
|
#get_connection_options(con_host = nil) ⇒ Object
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 181
def get_connection_options(con_host=nil)
hosts = if con_host || @hosts
(con_host || @hosts).split(',').map do |host_str|
if host_str.match(%r{^[^:]+(\:\d+)?$})
{
host: host_str.split(':')[0],
port: (host_str.split(':')[1] || @port).to_i,
scheme: @scheme.to_s
}
else
uri = URI(get_escaped_userinfo(host_str))
%w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
hash
end
end
end.compact
else
[{host: @host, port: @port, scheme: @scheme.to_s}]
end.each do |host|
host.merge!(user: @user, password: @password) if !host[:user] && @user
host.merge!(path: @path) if !host[:path] && @path
end
live_hosts = @check_connection ? hosts.select { |host| reachable_host?(host) } : hosts
{
hosts: live_hosts
}
end
|
#get_escaped_userinfo(host_str) ⇒ Object
169
170
171
172
173
174
175
176
177
178
179
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 169
def get_escaped_userinfo(host_str)
if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
m["scheme"] +
URI.encode_www_form_component(m["user"]) +
':' +
URI.encode_www_form_component(m["password"]) +
m["path"]
else
host_str
end
end
|
#is_existing_connection(host) ⇒ Object
325
326
327
328
329
330
331
332
333
334
335
336
337
338
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 325
def is_existing_connection(host)
return false if @_os.nil?
return false if @current_config.nil?
return false if host.length != @current_config.length
for i in 0...host.length
if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
return false
end
end
return true
end
|
#parse_time(value, event_time, tag) ⇒ Object
278
279
280
281
282
283
284
285
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 278
def parse_time(value, event_time, tag)
@timestamp_parser.call(value)
rescue => e
emit_error_label_event do
router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
end
return Time.at(event_time).to_time
end
|
#process_events(hit, es) ⇒ Object
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 416
def process_events(hit, es)
event = hit["_source"]
time = Fluent::Engine.now
if @parse_timestamp
if event.has_key?(TIMESTAMP_FIELD)
rts = event[TIMESTAMP_FIELD]
time = parse_time(rts, time, @tag)
end
end
if @docinfo
docinfo_target = event[@docinfo_target] || {}
unless docinfo_target.is_a?(Hash)
raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
end
@docinfo_fields.each do |field|
docinfo_target[field] = hit[field]
end
event[@docinfo_target] = docinfo_target
end
es.add(time, event)
end
|
410
411
412
413
414
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 410
def process_next_scroll_request(es, scroll_id)
result = process_scroll_request(scroll_id)
result['hits']['hits'].each { |hit| process_events(hit, es) }
{'has_hits' => result['hits']['hits'].any?, '_scroll_id' => result['_scroll_id']}
end
|
406
407
408
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 406
def process_scroll_request(scroll_id)
client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
|
#reachable_host?(host) ⇒ Boolean
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 213
def reachable_host?(host)
client = OpenSearch::Client.new(
host: ["#{host[:scheme]}://#{host[:host]}:#{host[:port]}"],
user: host[:user],
password: host[:password],
request_timeout: @request_timeout,
resurrect_after: @resurrect_after,
reload_on_failure: @reload_on_failure,
transport_options: { ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version } }
)
client.ping
rescue => e
log.warn "Failed to connect to #{host[:scheme]}://#{host[:host]}:#{host[:port]}: #{e.message}"
false
end
|
#retry_state(randomize) ⇒ Object
160
161
162
163
164
165
166
167
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 160
def retry_state(randomize)
retry_state_create(
:input_retries, @retry_type, @retry_wait, @retry_timeout,
forever: @retry_forever, max_steps: @retry_max_times,
max_interval: @retry_max_interval, backoff_base: @retry_exponential_backoff_base,
randomize: randomize
)
end
|
#run ⇒ Object
363
364
365
366
367
368
369
370
371
372
373
374
375
376
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 363
def run
return run_slice if @num_slices <= 1
log.warn("Large slice number is specified:(#{@num_slices}). Consider reducing num_slices") if @num_slices > 8
@num_slices.times.map do |slice_id|
thread_create(:"in_opensearch_thread_#{slice_id}") do
run_slice(slice_id)
end
end
rescue Faraday::ConnectionFailed, OpenSearch::Transport::Transport::Error => error
update_retry_state(error)
retry
end
|
#run_slice(slice_id = nil) ⇒ Object
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 378
def run_slice(slice_id=nil)
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
es = Fluent::MultiEventStream.new
result["hits"]["hits"].each {|hit| process_events(hit, es)}
has_hits = result['hits']['hits'].any?
scroll_id = result['_scroll_id']
while has_hits && scroll_id
result = process_next_scroll_request(es, scroll_id)
has_hits = result['has_hits']
scroll_id = result['_scroll_id']
end
router.emit_stream(@tag, es)
clear_scroll(scroll_id)
update_retry_state
end
|
#start ⇒ Object
237
238
239
240
241
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 237
def start
super
timer_execute(:in_opensearch_timer, @interval, repeat: @repeat, &method(:run))
end
|
#update_retry_state(error = nil) ⇒ Object
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
|
# File 'lib/fluent/plugin/in_opensearch.rb', line 340
def update_retry_state(error=nil)
if error
unless @retry
@retry = retry_state(@retry_randomize)
end
@retry.step
if error.message.include?('EOFError (EOFError)')
log.error("Restart plugin #{error.message}")
exit(1)
end
raise "Hit limit for retries. retry_times: #{@retry.steps}, error: #{error.message}" if @retry.limit?
log.warn("failed to connect or search.", retry_times: @retry.steps, next_retry_time: @retry.next_time.round, error: error.message)
sleep(@retry.next_time - Time.now)
else
unless @retry.nil?
log.info("retry succeeded.")
@retry = nil
end
end
end
|