Class: LogStash::Outputs::ElasticSearch::HttpClient::Pool
- Inherits:
-
Object
- Object
- LogStash::Outputs::ElasticSearch::HttpClient::Pool
show all
- Defined in:
- lib/logstash/outputs/elasticsearch/http_client/pool.rb
Defined Under Namespace
Classes: BadResponseCodeError, HostUnreachableError, NoConnectionAvailableError
Constant Summary
collapse
- ROOT_URI_PATH =
'/'.freeze
- LICENSE_PATH =
'/_license'.freeze
- VERSION_6_TO_7 =
::Gem::Requirement.new([">= 6.0.0", "< 7.0.0"])
- VERSION_7_TO_7_14 =
::Gem::Requirement.new([">= 7.0.0", "< 7.14.0"])
- DEFAULT_OPTIONS =
{
:healthcheck_path => ROOT_URI_PATH,
:sniffing_path => "/_nodes/http",
:bulk_path => "/_bulk",
:scheme => 'http',
:resurrect_delay => 5,
:sniffing => false,
:sniffer_delay => 10,
}.freeze
- ES1_SNIFF_RE_URL =
/\[([^\/]*)?\/?([^:]*):([0-9]+)\]/
- ES2_AND_ABOVE_SNIFF_RE_URL =
/([^\/]*)?\/?([^:]*):([0-9]+)/
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_url(url) ⇒ Object
-
#address_str_to_uri(addr_str) ⇒ Object
-
#alive_urls_count ⇒ Object
-
#check_sniff ⇒ Object
Sniffs and returns the results.
-
#close ⇒ Object
-
#elasticsearch?(url) ⇒ Boolean
-
#empty_url_meta ⇒ Object
-
#get_connection ⇒ Object
-
#get_es_version(url) ⇒ Object
-
#get_license(url) ⇒ Hash
Retrieve ES node license information.
-
#health_check_request(url) ⇒ Object
-
#healthcheck!(register_phase = true) ⇒ Object
-
#in_use_connections ⇒ Object
-
#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool
constructor
-
#last_es_version ⇒ Object
-
#major_version(version_string) ⇒ Object
-
#mark_dead(url, error) ⇒ Object
-
#maximum_seen_major_version ⇒ Object
-
#normalize_url(uri) ⇒ Object
-
#perform_request(method, path, params = {}, body = nil) ⇒ Object
-
#perform_request_to_url(url, method, path, params = {}, body = nil) ⇒ Object
-
#remove_url(url) ⇒ Object
-
#resurrectionist_alive? ⇒ Boolean
-
#return_connection(url) ⇒ Object
-
#size ⇒ Object
-
#sniff(nodes) ⇒ Object
-
#sniff! ⇒ Object
Sniffs the cluster then updates the internal URLs.
-
#sniffer_alive? ⇒ Boolean
-
#start ⇒ Object
-
#start_resurrectionist ⇒ Object
-
#start_sniffer ⇒ Object
-
#stop_resurrectionist ⇒ Object
-
#stop_sniffer ⇒ Object
-
#until_stopped(task_name, delay) ⇒ Object
-
#update_initial_urls ⇒ Object
-
#update_urls(new_urls) ⇒ Object
-
#url_info ⇒ Object
-
#url_meta(url) ⇒ Object
-
#urls ⇒ Object
-
#valid_tagline?(version_info) ⇒ Boolean
-
#wait_for_in_use_connections ⇒ Object
-
#with_connection ⇒ Object
Constructor Details
#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool
Returns a new instance of Pool.
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 53
def initialize(logger, adapter, initial_urls=[], options={})
@logger = logger
@adapter = adapter
@metric = options[:metric]
@initial_urls = initial_urls
raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
@url_normalizer = options[:url_normalizer]
DEFAULT_OPTIONS.merge(options).tap do |merged|
@bulk_path = merged[:bulk_path]
@sniffing_path = merged[:sniffing_path]
@healthcheck_path = merged[:healthcheck_path]
@resurrect_delay = merged[:resurrect_delay]
@sniffing = merged[:sniffing]
@sniffer_delay = merged[:sniffer_delay]
end
@state_mutex = Mutex.new
@url_info = {}
@stopping = false
@license_checker = options[:license_checker] || LogStash::PluginMixins::ElasticSearch::NoopLicenseChecker::INSTANCE
@last_es_version = Concurrent::AtomicReference.new
end
|
Instance Attribute Details
#adapter ⇒ Object
Returns the value of attribute adapter.
34
35
36
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 34
def adapter
@adapter
end
|
#bulk_path ⇒ Object
Returns the value of attribute bulk_path.
34
35
36
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 34
def bulk_path
@bulk_path
end
|
#healthcheck_path ⇒ Object
Returns the value of attribute healthcheck_path.
34
35
36
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 34
def healthcheck_path
@healthcheck_path
end
|
#license_checker ⇒ Object
license_checker is used by the pool specs
35
36
37
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 35
def license_checker
@license_checker
end
|
#logger ⇒ Object
Returns the value of attribute logger.
34
35
36
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 34
def logger
@logger
end
|
#resurrect_delay ⇒ Object
Returns the value of attribute resurrect_delay.
34
35
36
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 34
def resurrect_delay
@resurrect_delay
end
|
#sniffer_delay ⇒ Object
Returns the value of attribute sniffer_delay.
34
35
36
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 34
def sniffer_delay
@sniffer_delay
end
|
#sniffing ⇒ Object
Returns the value of attribute sniffing.
34
35
36
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 34
def sniffing
@sniffing
end
|
#sniffing_path ⇒ Object
Returns the value of attribute sniffing_path.
34
35
36
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 34
def sniffing_path
@sniffing_path
end
|
Instance Method Details
#add_url(url) ⇒ Object
381
382
383
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 381
def add_url(url)
@url_info[url] ||= empty_url_meta
end
|
#address_str_to_uri(addr_str) ⇒ Object
198
199
200
201
202
203
204
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 198
def address_str_to_uri(addr_str)
matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_AND_ABOVE_SNIFF_RE_URL)
if matches
host = matches[1].empty? ? matches[2] : matches[1]
::LogStash::Util::SafeURI.new("#{host}:#{matches[3]}")
end
end
|
#alive_urls_count ⇒ Object
119
120
121
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 119
def alive_urls_count
@state_mutex.synchronize { @url_info.values.select {|v| v[:state] == :alive }.count }
end
|
#check_sniff ⇒ Object
Sniffs and returns the results. Does not update internal URLs!
173
174
175
176
177
178
179
180
181
182
183
184
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 173
def check_sniff
_, url_meta, resp = perform_request(:get, @sniffing_path)
@metric.increment(:sniff_requests)
parsed = LogStash::Json.load(resp.body)
nodes = parsed['nodes']
if !nodes || nodes.empty?
@logger.warn("Sniff returned no nodes! Will not update hosts.")
return nil
else
sniff(nodes)
end
end
|
#close ⇒ Object
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 92
def close
@state_mutex.synchronize { @stopping = true }
logger.debug "Stopping sniffer"
stop_sniffer
logger.debug "Stopping resurrectionist"
stop_resurrectionist
logger.debug "Waiting for in use manticore connections"
wait_for_in_use_connections
logger.debug("Closing adapter #{@adapter}")
@adapter.close
end
|
#elasticsearch?(url) ⇒ Boolean
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 267
def elasticsearch?(url)
begin
response = perform_request_to_url(url, :get, ROOT_URI_PATH)
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return false if response.code == 401 || response.code == 403
raise e
end
version_info = LogStash::Json.load(response.body)
return false if version_info['version'].nil?
version = ::Gem::Version.new(version_info["version"]['number'])
return false if version < ::Gem::Version.new('6.0.0')
if VERSION_6_TO_7.satisfied_by?(version)
return valid_tagline?(version_info)
elsif VERSION_7_TO_7_14.satisfied_by?(version)
build_flavor = version_info["version"]['build_flavor']
return false if build_flavor.nil? || build_flavor != 'default' || !valid_tagline?(version_info)
else
= response..transform_keys {|key| key.to_s.downcase }
= ['x-elastic-product']
return false if != 'Elasticsearch'
end
return true
rescue => e
logger.error("Unable to retrieve Elasticsearch version", url: url.sanitized.to_s, exception: e.class, message: e.message)
false
end
|
389
390
391
392
393
394
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 389
def empty_url_meta
{
:in_use => 0,
:state => :unknown
}
end
|
#get_connection ⇒ Object
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 435
def get_connection
@state_mutex.synchronize do
eligible_set = nil
lowest_value_seen = nil
@url_info.each do |url,meta|
meta_in_use = meta[:in_use]
next if meta[:state] == :dead
if lowest_value_seen.nil? || meta_in_use < lowest_value_seen
lowest_value_seen = meta_in_use
eligible_set = [[url, meta]]
elsif lowest_value_seen == meta_in_use
eligible_set << [url, meta]
end
end
return nil if eligible_set.nil?
pick, pick_meta = eligible_set.sample
pick_meta[:in_use] += 1
[pick, pick_meta]
end
end
|
#get_es_version(url) ⇒ Object
470
471
472
473
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 470
def get_es_version(url)
request = perform_request_to_url(url, :get, ROOT_URI_PATH)
LogStash::Json.load(request.body)["version"]["number"] end
|
#get_license(url) ⇒ Hash
Retrieve ES node license information
225
226
227
228
229
230
231
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 225
def get_license(url)
response = perform_request_to_url(url, :get, LICENSE_PATH)
LogStash::Json.load(response.body)
rescue => e
logger.error("Unable to get license information", url: url.sanitized.to_s, exception: e.class, message: e.message)
{}
end
|
#health_check_request(url) ⇒ Object
233
234
235
236
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 233
def health_check_request(url)
logger.debug("Running health check to see if an ES connection is working", url: url.sanitized.to_s, path: @healthcheck_path)
perform_request_to_url(url, :head, @healthcheck_path)
end
|
#healthcheck!(register_phase = true) ⇒ Object
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 238
def healthcheck!(register_phase = true)
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
begin
health_check_request(url)
if register_phase
if !elasticsearch?(url)
raise LogStash::ConfigurationError, "Could not connect to a compatible version of Elasticsearch"
end
end
logger.warn("Restored connection to ES instance", url: url.sanitized.to_s)
es_version = get_es_version(url)
@state_mutex.synchronize do
meta[:version] = es_version
set_last_es_version(es_version, url)
alive = @license_checker.appropriate_license?(self, url)
meta[:state] = alive ? :alive : :dead
end
rescue HostUnreachableError, BadResponseCodeError => e
logger.warn("Attempted to resurrect connection to dead ES instance, but got an error", url: url.sanitized.to_s, exception: e.class, message: e.message)
end
end
end
|
#in_use_connections ⇒ Object
115
116
117
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 115
def in_use_connections
@state_mutex.synchronize { @url_info.values.select {|v| v[:in_use] > 0 } }
end
|
#last_es_version ⇒ Object
475
476
477
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 475
def last_es_version
@last_es_version.get
end
|
#major_version(version_string) ⇒ Object
186
187
188
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 186
def major_version(version_string)
version_string.split('.').first.to_i
end
|
#mark_dead(url, error) ⇒ Object
415
416
417
418
419
420
421
422
423
424
425
426
427
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 415
def mark_dead(url, error)
@state_mutex.synchronize do
meta = @url_info[url]
return unless meta
logger.warn("Marking url as dead. Last error: [#{error.class}] #{error.message}",
:url => url, :error_message => error.message, :error_class => error.class.name)
meta[:state] = :dead
meta[:last_error] = error
meta[:last_errored_at] = Time.now
end
end
|
#maximum_seen_major_version ⇒ Object
479
480
481
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 479
def maximum_seen_major_version
@state_mutex.synchronize { @maximum_seen_major_version }
end
|
#normalize_url(uri) ⇒ Object
331
332
333
334
335
336
337
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 331
def normalize_url(uri)
u = @url_normalizer.call(uri)
if !u.is_a?(::LogStash::Util::SafeURI)
raise "URL Normalizer returned a '#{u.class}' rather than a SafeURI! This shouldn't happen!"
end
u
end
|
311
312
313
314
315
316
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 311
def perform_request(method, path, params={}, body=nil)
with_connection do |url, url_meta|
resp = perform_request_to_url(url, method, path, params, body)
[url, url_meta, resp]
end
end
|
325
326
327
328
329
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 325
def perform_request_to_url(url, method, path, params={}, body=nil)
res = @adapter.perform_request(url, method, path, params, body)
rescue *@adapter.host_unreachable_exceptions => e
raise HostUnreachableError.new(e, url), "Could not reach host #{e.class}: #{e.message}"
end
|
#remove_url(url) ⇒ Object
385
386
387
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 385
def remove_url(url)
@url_info.delete(url)
end
|
#resurrectionist_alive? ⇒ Boolean
307
308
309
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 307
def resurrectionist_alive?
@resurrectionist ? @resurrectionist.alive? : nil
end
|
#return_connection(url) ⇒ Object
463
464
465
466
467
468
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 463
def return_connection(url)
@state_mutex.synchronize do
info = @url_info[url]
info[:in_use] -= 1 if info end
end
|
#size ⇒ Object
377
378
379
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 377
def size
@state_mutex.synchronize { @url_info.size }
end
|
#sniff(nodes) ⇒ Object
190
191
192
193
194
195
196
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 190
def sniff(nodes)
nodes.map do |id,info|
next if info["roles"] && info["roles"] == ["master"]
address_str_to_uri(info["http"]["publish_address"]) if info["http"]
end.compact
end
|
#sniff! ⇒ Object
Sniffs the cluster then updates the internal URLs
166
167
168
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 166
def sniff!
update_urls(check_sniff)
end
|
#sniffer_alive? ⇒ Boolean
210
211
212
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 210
def sniffer_alive?
@sniffer ? @sniffer.alive? : nil
end
|
#start ⇒ Object
82
83
84
85
86
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 82
def start
update_initial_urls
start_resurrectionist
start_sniffer if @sniffing
end
|
#start_resurrectionist ⇒ Object
214
215
216
217
218
219
220
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 214
def start_resurrectionist
@resurrectionist = Thread.new do
until_stopped("resurrection", @resurrect_delay) do
healthcheck!(false)
end
end
end
|
#start_sniffer ⇒ Object
152
153
154
155
156
157
158
159
160
161
162
163
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 152
def start_sniffer
@sniffer = Thread.new do
until_stopped("sniffing", sniffer_delay) do
begin
sniff!
rescue NoConnectionAvailableError => e
@state_mutex.synchronize { logger.warn("Elasticsearch output attempted to sniff for new connections but cannot. No living connections are detected. Pool contains the following current URLs", :url_info => @url_info) }
end
end
end
end
|
#stop_resurrectionist ⇒ Object
303
304
305
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 303
def stop_resurrectionist
@resurrectionist.join if @resurrectionist
end
|
#stop_sniffer ⇒ Object
206
207
208
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 206
def stop_sniffer
@sniffer.join if @sniffer
end
|
#until_stopped(task_name, delay) ⇒ Object
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 131
def until_stopped(task_name, delay)
last_done = Time.now
until @state_mutex.synchronize { @stopping }
begin
now = Time.now
if (now - last_done) >= delay
last_done = now
yield
end
sleep 1
rescue => e
logger.warn(
"Error while performing #{task_name}",
:error_message => e.message,
:class => e.class.name,
:backtrace => e.backtrace
)
end
end
end
|
#update_initial_urls ⇒ Object
88
89
90
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 88
def update_initial_urls
update_urls(@initial_urls)
end
|
#update_urls(new_urls) ⇒ Object
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 339
def update_urls(new_urls)
return if new_urls.nil?
new_urls = new_urls.map(&method(:normalize_url))
state_changes = {:removed => [], :added => []}
@state_mutex.synchronize do
new_urls.each do |url|
unless @url_info.keys.include?(url)
state_changes[:added] << url
add_url(url)
end
end
@url_info.each do |url,_|
unless new_urls.include?(url)
state_changes[:removed] << url
remove_url(url)
end
end
end
if state_changes[:removed].size > 0 || state_changes[:added].size > 0
logger.info? && logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
end
healthcheck!
end
|
#url_info ⇒ Object
123
124
125
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 123
def url_info
@state_mutex.synchronize { @url_info }
end
|
429
430
431
432
433
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 429
def url_meta(url)
@state_mutex.synchronize do
@url_info[url]
end
end
|
#urls ⇒ Object
127
128
129
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 127
def urls
url_info.keys
end
|
#valid_tagline?(version_info) ⇒ Boolean
298
299
300
301
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 298
def valid_tagline?(version_info)
tagline = version_info['tagline']
tagline == "You Know, for Search"
end
|
#wait_for_in_use_connections ⇒ Object
108
109
110
111
112
113
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 108
def wait_for_in_use_connections
until in_use_connections.empty?
logger.info "Blocked on shutdown to in use connections #{@state_mutex.synchronize {@url_info}}"
sleep 1
end
end
|
#with_connection ⇒ Object
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 396
def with_connection
url, url_meta = get_connection
raise NoConnectionAvailableError, "No Available connections" unless url
yield url, url_meta
rescue HostUnreachableError => e
mark_dead(url, e)
raise e
rescue BadResponseCodeError => e
raise e
ensure
return_connection(url)
end
|