Class: Fluent::Plugin::ElasticsearchInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::ElasticsearchInput
show all
- Includes:
- ElasticsearchConstants
- Defined in:
- lib/fluent/plugin/in_elasticsearch.rb
Defined Under Namespace
Classes: UnrecoverableRequestFailure
Constant Summary
collapse
- DEFAULT_RELOAD_AFTER =
-1
- DEFAULT_STORAGE_TYPE =
'local'
- METADATA =
"@metadata".freeze
Fluent::Plugin::ElasticsearchConstants::BODY_DELIMITER, Fluent::Plugin::ElasticsearchConstants::CREATE_OP, Fluent::Plugin::ElasticsearchConstants::ID_FIELD, Fluent::Plugin::ElasticsearchConstants::INDEX_OP, Fluent::Plugin::ElasticsearchConstants::TIMESTAMP_FIELD, Fluent::Plugin::ElasticsearchConstants::UPDATE_OP, Fluent::Plugin::ElasticsearchConstants::UPSERT_OP
Instance Method Summary
collapse
Constructor Details
Returns a new instance of ElasticsearchInput.
58
59
60
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 58
def initialize
super
end
|
Instance Method Details
#backend_options ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 100
def backend_options
case @http_backend
when :excon
{ client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
when :typhoeus
require 'typhoeus'
{ sslkey: @client_key, sslcert: @client_cert, keypasswd: @client_key_pass }
end
rescue LoadError => ex
log.error_backtrace(ex.backtrace)
raise Fluent::ConfigError, "You must install #{@http_backend} gem. Exception: #{ex}"
end
|
#client(host = nil) ⇒ Object
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 205
def client(host = nil)
connection_options = get_connection_options(host)
@_es = nil unless is_existing_connection(connection_options[:hosts])
@_es ||= begin
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
local_reload_connections = @reload_after
end
= { 'Content-Type' => "application/json" }.merge(@custom_headers)
transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(
connection_options.merge(
options: {
reload_connections: local_reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
logger: @transport_logger,
transport_options: {
headers: ,
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file, version: @ssl_version }
},
http: {
user: @user,
password: @password
},
sniffer_class: @sniffer_class,
}), &adapter_conf)
Elasticsearch::Client.new transport: transport
end
end
|
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 62
def configure(conf)
super
@timestamp_parser = create_time_parser
@backend_options = backend_options
raise Fluent::ConfigError, "`password` must be present if `user` is present" if @user && @password.nil?
if @user && m = @user.match(/%{(?<user>.*)}/)
@user = URI.encode_www_form_component(m["user"])
end
if @password && m = @password.match(/%{(?<password>.*)}/)
@password = URI.encode_www_form_component(m["password"])
end
@transport_logger = nil
if @with_transporter_log
@transport_logger = log
log_level = conf['@log_level'] || conf['log_level']
log.warn "Consider to specify log_level with @log_level." unless log_level
end
@current_config = nil
@sniffer_class = nil
begin
@sniffer_class = Object.const_get(@sniffer_class_name) if @sniffer_class_name
rescue Exception => ex
raise Fluent::ConfigError, "Could not load sniffer class #{@sniffer_class_name}: #{ex}"
end
@options = {
:index => @index_name,
:scroll => @scroll,
:size => @size
}
@base_query = @query
end
|
#convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z") ⇒ Object
193
194
195
196
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 193
def convert_numeric_time_into_string(numeric_time, timestamp_key_format = "%Y-%m-%dT%H:%M:%S.%N%z")
numeric_time_parser = Fluent::NumericTimeParser.new(:float)
Time.at(numeric_time_parser.parse(numeric_time).to_r).strftime(timestamp_key_format)
end
|
#create_time_parser ⇒ Object
once fluent v0.14 is released we might be able to use Fluent::Parser::TimeParser, but it doesn’t quite do what we want - if gives
- sec,nsec
-
where as we want something we can call ‘strftime` on…
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 166
def create_time_parser
if @timestamp_key_format
begin
strptime = Strptime.new(@timestamp_key_format)
Proc.new { |value|
value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
strptime.exec(value).to_time
}
rescue
Proc.new { |value|
value = convert_numeric_time_into_string(value, @timestamp_key_format) if value.is_a?(Numeric)
DateTime.strptime(value, @timestamp_key_format).to_time
}
end
else
Proc.new { |value|
value = convert_numeric_time_into_string(value) if value.is_a?(Numeric)
DateTime.parse(value).to_time
}
end
end
|
#get_connection_options(con_host = nil) ⇒ Object
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 125
def get_connection_options(con_host=nil)
hosts = if con_host || @hosts
(con_host || @hosts).split(',').map do |host_str|
if host_str.match(%r{^[^:]+(\:\d+)?$})
{
host: host_str.split(':')[0],
port: (host_str.split(':')[1] || @port).to_i,
scheme: @scheme.to_s
}
else
uri = URI(get_escaped_userinfo(host_str))
%w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
hash
end
end
end.compact
else
[{host: @host, port: @port, scheme: @scheme.to_s}]
end.each do |host|
host.merge!(user: @user, password: @password) if !host[:user] && @user
host.merge!(path: @path) if !host[:path] && @path
end
{
hosts: hosts
}
end
|
#get_escaped_userinfo(host_str) ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 113
def get_escaped_userinfo(host_str)
if m = host_str.match(/(?<scheme>.*)%{(?<user>.*)}:%{(?<password>.*)}(?<path>@.*)/)
m["scheme"] +
URI.encode_www_form_component(m["user"]) +
':' +
URI.encode_www_form_component(m["password"]) +
m["path"]
else
host_str
end
end
|
#is_existing_connection(host) ⇒ Object
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 243
def is_existing_connection(host)
return false if @_es.nil?
return false if @current_config.nil?
return false if host.length != @current_config.length
for i in 0...host.length
if !host[i][:host].eql? @current_config[i][:host] || host[i][:port] != @current_config[i][:port]
return false
end
end
return true
end
|
#parse_time(value, event_time, tag) ⇒ Object
198
199
200
201
202
203
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 198
def parse_time(value, event_time, tag)
@timestamp_parser.call(value)
rescue => e
router.emit_error_event(@timestamp_parse_error_tag, Fluent::Engine.now, {'tag' => tag, 'time' => event_time, 'format' => @timestamp_key_format, 'value' => value}, e)
return Time.at(event_time).to_time
end
|
#process_events(hit, es) ⇒ Object
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 300
def process_events(hit, es)
event = hit["_source"]
time = Fluent::Engine.now
if @parse_timestamp
if event.has_key?(TIMESTAMP_FIELD)
rts = event[TIMESTAMP_FIELD]
time = parse_time(rts, time, @tag)
end
end
if @docinfo
docinfo_target = event[@docinfo_target] || {}
unless docinfo_target.is_a?(Hash)
raise UnrecoverableError, "incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :type => docinfo_target.class, :event => event
end
@docinfo_fields.each do |field|
docinfo_target[field] = hit[field]
end
event[@docinfo_target] = docinfo_target
end
es.add(time, event)
end
|
294
295
296
297
298
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 294
def process_next_scroll_request(es, scroll_id)
result = process_scroll_request(scroll_id)
result['hits']['hits'].each { |hit| process_events(hit, es) }
{'has_hits' => result['hits']['hits'].any?, '_scroll_id' => result['_scroll_id']}
end
|
290
291
292
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 290
def process_scroll_request(scroll_id)
client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
|
#run ⇒ Object
258
259
260
261
262
263
264
265
266
267
268
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 258
def run
return run_slice if @num_slices <= 1
log.warn("Large slice number is specified:(#{@num_slices}). Consider reducing num_slices") if @num_slices > 8
@num_slices.times.map do |slice_id|
thread_create(:"in_elasticsearch_thread_#{slice_id}") do
run_slice(slice_id)
end
end
end
|
#run_slice(slice_id = nil) ⇒ Object
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 270
def run_slice(slice_id=nil)
slice_query = @base_query
slice_query = slice_query.merge('slice' => { 'id' => slice_id, 'max' => @num_slices}) unless slice_id.nil?
result = client.search(@options.merge(:body => Yajl.dump(slice_query) ))
es = Fluent::MultiEventStream.new
result["hits"]["hits"].each {|hit| process_events(hit, es)}
has_hits = result['hits']['hits'].any?
scroll_id = result['_scroll_id']
while has_hits && scroll_id
result = process_next_scroll_request(es, scroll_id)
has_hits = result['has_hits']
scroll_id = result['_scroll_id']
end
router.emit_stream(@tag, es)
client.clear_scroll(scroll_id: scroll_id) if scroll_id
end
|
#start ⇒ Object
157
158
159
160
161
|
# File 'lib/fluent/plugin/in_elasticsearch.rb', line 157
def start
super
timer_execute(:in_elasticsearch_timer, @interval, repeat: @repeat, &method(:run))
end
|