Class: Fluent::Plugin::ElasticsearchInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::ElasticsearchInput
show all
- Includes:
- ElasticsearchConstants
- Defined in:
- lib/fluent/plugin/in_elasticsearch.rb
Defined Under Namespace
Classes: UnrecoverableRequestFailure
Constant Summary
collapse
- DEFAULT_RELOAD_AFTER =
-1
- DEFAULT_STORAGE_TYPE =
'local'
- METADATA =
"@metadata".freeze
Fluent::Plugin::ElasticsearchConstants::BODY_DELIMITER, Fluent::Plugin::ElasticsearchConstants::CREATE_OP, Fluent::Plugin::ElasticsearchConstants::ID_FIELD, Fluent::Plugin::ElasticsearchConstants::INDEX_OP, Fluent::Plugin::ElasticsearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::ElasticsearchConstants::UPDATE_OP, Fluent::Plugin::ElasticsearchConstants::UPSERT_OP
Instance Method Summary
collapse
Constructor Details
Returns a new instance of ElasticsearchInput.
60
61
62
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 60
def initialize
super
end
|
Instance Method Details
#backend_options ⇒ Object
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 102
def backend_options
case @http_backend
when :excon
{ client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
when :typhoeus
require 'typhoeus'
{ sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
end
rescue LoadError => ex
log.error_backtrace(ex.backtrace)
raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end
|
#client(host = nil) ⇒ Object
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 207
def client(host = nil)
connection_options = get_connection_options(host)
@_es = nil unless is_existing_connection(connection_options[:hosts])
@_es ||= begin
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
local_reload_connections = @reload_after
end
= { 'Content-Type' => "application/json" }.merge(@custom_headers)
transport = TRANSPORT_CLASS::Transport::HTTP::Faraday.new(
connection_options.merge(
options: {
reload_connections: local_reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
logger: @transport_logger,
transport_options: {
headers: ,
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
},
sniffer_class: @sniffer_class,
}), &adapter_conf)
Elasticsearch::Client.new transport: transport
end
end
|
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 64
def configure(conf)
super
@timestamp_parser = create_time_parser
@backend_options = backend_options
raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?
if @user && m = @user.match(/%{(?<user>.*)}/)
@user = URI.encode_www_form_component(m["user"])
end
if @password && m = @password.match(/%{(?<password>.*)}/)
@password = URI.encode_www_form_component(m["password"])
end
@transport_logger = nil
if @with_transporter_log
@transport_logger = log
log_level = conf['@log_level'] || conf['log_level']
log.warn "Consider to specify log_level with @log_level." unless log_level
end
@current_config = nil
@sniffer_class = nil
begin
@sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
rescue Exception => ex
raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
end
@options = {
:index => @index_name,
:scroll => @scroll,
:size => @size
}
@base_query = @query
end
|
#convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z") ⇒ Object
195
196
197
198
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 195
def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z")
numeric_time_parser = Fluent::NumericTimeParser.new(:float)
Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(timestamp_key_format)
end
|
#create_time_parser ⇒ Object
once fluent v0.14 is released we might be able to use Fluent::Parser::TimeParser, but it doesn’t quite do what we want - if gives
- sec,nsec
-
where as we want something we can call ‘strftime` on…
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 168
def create_time_parser
if @timestamp_key_format
begin
strptime = Strptime.new(@timestamp_key_format)
Proc.new { |value|
value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
strptime.exec(value).to_time
}
rescue
Proc.new { |value|
value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
DateTime.strptime(value, @timestamp_key_format).to_time
}
end
else
Proc.new { |value|
value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
DateTime.parse(value).to_time
}
end
end
|
#get_connection_options(con_host = nil) ⇒ Object
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 127
def get_connection_options(con_host=nil)
hosts = if con_host || @hosts
(con_host || @hosts).split(',').map do |host_str|
if host_str.match(%r{^[^:]+(\:\d+)?$})
{
host: host_str.split(':')[0],
port: (host_str.split(':')[1] || @port).to_i,
scheme: @scheme.to_s
}
else
uri = URI(get_escaped_userinfo(host_str))
%w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
hash
end
end
end.compact
else
[{host: @host, port: @port, scheme: @scheme.to_s}]
end.each do |host|
host.merge!(user: @user, password: @password) if !host[:user] && @user
host.merge!(path: @path) if !host[:path] && @path
end
{
hosts: hosts
}
end
|
#get_escaped_userinfo(host_str) ⇒ Object
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 115
def get_escaped_userinfo(host_str)
if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
m["scheme"] +
URI.encode_www_form_component(m["user"]) +
':' +
URI.encode_www_form_component(m["password"]) +
m["path"]
else
host_str
end
end
|
#is_existing_connection(host) ⇒ Object
245
246
247
248
249
250
251
252
253
254
255
256
257
258
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 245
def is_existing_connection(host)
return false if @_es.nil?
return false if @current_config.nil?
return false if host.length != @current_config.length
for i in 0...host.length
if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
return false
end
end
return true
end
|
#parse_time(value, event_time, tag) ⇒ Object
200
201
202
203
204
205
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 200
def parse_time(value, event_time, tag)
@timestamp_parser.call(value)
rescue => e
router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
return Time.at(event_time).to_time
end
|
#process_events(hit, es) ⇒ Object
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 306
def process_events(hit, es)
event = hit["_source"]
time = Fluent::Engine.now
if @parse_timestamp
if event.has_key?(TIMESTAMP_FIELD)
rts = event[TIMESTAMP_FIELD]
time = parse_time(rts, time, @tag)
end
end
if @docinfo
docinfo_target = event[@docinfo_target] || {}
unless docinfo_target.is_a?(Hash)
raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
end
@docinfo_fields.each do |field|
docinfo_target[field] = hit[field]
end
event[@docinfo_target] = docinfo_target
end
es.add(time, event)
end
|
300
301
302
303
304
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 300
def process_next_scroll_request(es, scroll_id)
result = process_scroll_request(scroll_id)
result['hits']['hits'].each { |hit| process_events(hit, es) }
{'has_hits' => result['hits']['hits'].any?, '_scroll_id' => result['_scroll_id']}
end
|
296
297
298
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 296
def process_scroll_request(scroll_id)
client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
|
#run ⇒ Object
260
261
262
263
264
265
266
267
268
269
270
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 260
def run
return run_slice if @num_slices <= 1
log.warn("Large slice number is specified:(#{@num_slices}). Consider reducing num_slices") if @num_slices > 8
@num_slices.times.map do |slice_id|
thread_create(:"in_elasticsearch_thread_#{slice_id}") do
run_slice(slice_id)
end
end
end
|
#run_slice(slice_id = nil) ⇒ Object
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 272
def run_slice(slice_id=nil)
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
es = Fluent::MultiEventStream.new
result["hits"]["hits"].each {|hit| process_events(hit, es)}
has_hits = result['hits']['hits'].any?
scroll_id = result['_scroll_id']
while has_hits && scroll_id
result = process_next_scroll_request(es, scroll_id)
has_hits = result['has_hits']
scroll_id = result['_scroll_id']
end
router.emit_stream(@tag, es)
if Gem::Version.new(Elasticsearch::VERSION) >= Gem::Version.new("7.0.0")
client.clear_scroll(body: {scroll_id: scroll_id}) if scroll_id
else
client.clear_scroll(scroll_id: scroll_id) if scroll_id
end
end
|
#start ⇒ Object
159
160
161
162
163
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 159
def start
super
timer_execute(:in_elasticsearch_timer, @interval, repeat: @repeat, &method(:run))
end
|