Class: LogStash::Outputs::ElasticSearch::HttpClient::Pool
- Inherits:
-
Object
- Object
- LogStash::Outputs::ElasticSearch::HttpClient::Pool
show all
- Defined in:
- lib/logstash/outputs/elasticsearch/http_client/pool.rb
Defined Under Namespace
Classes: BadResponseCodeError, HostUnreachableError, NoConnectionAvailableError
Constant Summary
collapse
- ROOT_URI_PATH =
'/'.freeze
- DEFAULT_OPTIONS =
{
:healthcheck_path => ROOT_URI_PATH,
:absolute_healthcheck_path => false,
:sniffing_path => '_nodes/http',
:absolute_sniffing_path => false,
:scheme => 'http',
:resurrect_delay => 5,
:sniffing => false,
:sniffer_delay => 10,
}.freeze
- ES1_SNIFF_RE_URL =
/\[([^\/]*)?\/?([^:]*):([0-9]+)\]/
- ES2_SNIFF_RE_URL =
/([^\/]*)?\/?([^:]*):([0-9]+)/
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_url(url) ⇒ Object
-
#alive_urls_count ⇒ Object
-
#check_sniff ⇒ Object
Sniffs and returns the results.
-
#close ⇒ Object
-
#empty_url_meta ⇒ Object
-
#get_connection ⇒ Object
-
#healthcheck! ⇒ Object
-
#in_use_connections ⇒ Object
-
#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool
constructor
-
#major_version(nodes) ⇒ Object
-
#mark_dead(url, error) ⇒ Object
-
#normalize_url(uri) ⇒ Object
-
#perform_request(method, path, params = {}, body = nil) ⇒ Object
-
#perform_request_to_url(url, method, path, params = {}, body = nil) ⇒ Object
-
#remove_url(url) ⇒ Object
-
#resurrectionist_alive? ⇒ Boolean
-
#return_connection(url) ⇒ Object
-
#size ⇒ Object
-
#sniff! ⇒ Object
Sniffs the cluster then updates the internal URLs.
-
#sniff_2x_1x(nodes) ⇒ Object
-
#sniff_5x(nodes) ⇒ Object
-
#sniffer_alive? ⇒ Boolean
-
#start ⇒ Object
-
#start_resurrectionist ⇒ Object
-
#start_sniffer ⇒ Object
-
#stop_resurrectionist ⇒ Object
-
#stop_sniffer ⇒ Object
-
#until_stopped(task_name, delay) ⇒ Object
-
#update_urls(new_urls) ⇒ Object
-
#url_info ⇒ Object
-
#url_meta(url) ⇒ Object
-
#urls ⇒ Object
-
#wait_for_in_use_connections ⇒ Object
-
#with_connection ⇒ Object
Constructor Details
#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool
Returns a new instance of Pool.
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 46
def initialize(logger, adapter, initial_urls=[], options={})
@logger = logger
@adapter = adapter
@initial_urls = initial_urls
raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
@url_normalizer = options[:url_normalizer]
DEFAULT_OPTIONS.merge(options).tap do |merged|
@healthcheck_path = merged[:healthcheck_path]
@absolute_healthcheck_path = merged[:absolute_healthcheck_path]
@sniffing_path = merged[:sniffing_path]
@absolute_sniffing_path = merged[:absolute_sniffing_path]
@resurrect_delay = merged[:resurrect_delay]
@sniffing = merged[:sniffing]
@sniffer_delay = merged[:sniffer_delay]
end
@state_mutex = Mutex.new
@url_info = {}
@stopping = false
end
|
Instance Attribute Details
#absolute_healthcheck_path ⇒ Object
Returns the value of attribute absolute_healthcheck_path.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def absolute_healthcheck_path
@absolute_healthcheck_path
end
|
#absolute_sniffing_path ⇒ Object
Returns the value of attribute absolute_sniffing_path.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def absolute_sniffing_path
@absolute_sniffing_path
end
|
#adapter ⇒ Object
Returns the value of attribute adapter.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def adapter
@adapter
end
|
#healthcheck_path ⇒ Object
Returns the value of attribute healthcheck_path.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def healthcheck_path
@healthcheck_path
end
|
#logger ⇒ Object
Returns the value of attribute logger.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def logger
@logger
end
|
#resurrect_delay ⇒ Object
Returns the value of attribute resurrect_delay.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def resurrect_delay
@resurrect_delay
end
|
#sniffer_delay ⇒ Object
Returns the value of attribute sniffer_delay.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def sniffer_delay
@sniffer_delay
end
|
#sniffing ⇒ Object
Returns the value of attribute sniffing.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def sniffing
@sniffing
end
|
#sniffing_path ⇒ Object
Returns the value of attribute sniffing_path.
31
32
33
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31
def sniffing_path
@sniffing_path
end
|
Instance Method Details
#add_url(url) ⇒ Object
342
343
344
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 342
def add_url(url)
@url_info[url] ||= empty_url_meta
end
|
#alive_urls_count ⇒ Object
104
105
106
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 104
def alive_urls_count
@state_mutex.synchronize { @url_info.values.select {|v| !v[:state] == :alive }.count }
end
|
#check_sniff ⇒ Object
Sniffs and returns the results. Does not update internal URLs!
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 158
def check_sniff
resp = nil
with_connection do |url|
sniffing_url = url.clone
sniffing_url.query = nil
if @absolute_sniffing_path
sniffing_url.path = ROOT_URI_PATH
end
resp = perform_request_to_url(sniffing_url, :get, @sniffing_path, {}, nil)
end
parsed = LogStash::Json.load(resp.body)
nodes = parsed['nodes']
if !nodes || nodes.empty?
@logger.warn("Sniff returned no nodes! Will not update hosts.")
return nil
else
case major_version(nodes)
when 5
sniff_5x(nodes)
when 2
sniff_2x_1x(nodes)
when 1
sniff_2x_1x(nodes)
else
@logger.warn("Could not determine version for nodes in ES cluster!")
return nil
end
end
end
|
#close ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 77
def close
@state_mutex.synchronize { @stopping = true }
logger.debug "Stopping sniffer"
stop_sniffer
logger.debug "Stopping resurrectionist"
stop_resurrectionist
logger.debug "Waiting for in use manticore connections"
wait_for_in_use_connections
logger.debug("Closing adapter #{@adapter}")
@adapter.close
end
|
350
351
352
353
354
355
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 350
def empty_url_meta
{
:in_use => 0,
:state => :unknown
}
end
|
#get_connection ⇒ Object
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 399
def get_connection
@state_mutex.synchronize do
eligible_set = nil
lowest_value_seen = nil
@url_info.each do |url,meta|
meta_in_use = meta[:in_use]
next if meta[:state] == :dead
if lowest_value_seen.nil? || meta_in_use < lowest_value_seen
lowest_value_seen = meta_in_use
eligible_set = [[url, meta]]
elsif lowest_value_seen == meta_in_use
eligible_set << [url, meta]
end
end
return nil if eligible_set.nil?
pick, pick_meta = eligible_set.sample
pick_meta[:in_use] += 1
[pick, pick_meta]
end
end
|
#healthcheck! ⇒ Object
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 241
def healthcheck!
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
begin
path = healthcheck_path
healthcheck_url = LogStash::Util::SafeURI.new(url.uri.clone)
healthcheck_url.query = nil
if @absolute_healthcheck_path
healthcheck_url.path = ROOT_URI_PATH
end
logger.info("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => healthcheck_url, :path => path)
response = perform_request_to_url(healthcheck_url, :head, path)
logger.warn("Restored connection to ES instance", :url => healthcheck_url.sanitized)
@state_mutex.synchronize { meta[:state] = :alive }
rescue HostUnreachableError, BadResponseCodeError => e
logger.warn("Attempted to resurrect connection to dead ES instance, but got an error.", url: url.sanitized, error_type: e.class, error: e.message)
end
end
end
|
#in_use_connections ⇒ Object
100
101
102
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 100
def in_use_connections
@state_mutex.synchronize { @url_info.values.select {|v| v[:in_use] > 0 } }
end
|
#major_version(nodes) ⇒ Object
189
190
191
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 189
def major_version(nodes)
k,v = nodes.first; v['version'].split('.').first.to_i
end
|
#mark_dead(url, error) ⇒ Object
379
380
381
382
383
384
385
386
387
388
389
390
391
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 379
def mark_dead(url, error)
@state_mutex.synchronize do
meta = @url_info[url]
return unless meta
logger.warn("Marking url as dead. Last error: [#{error.class}] #{error.message}",
:url => url, :error_message => error.message, :error_class => error.class.name)
meta[:state] = :dead
meta[:last_error] = error
meta[:last_errored_at] = Time.now
end
end
|
#normalize_url(uri) ⇒ Object
290
291
292
293
294
295
296
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 290
def normalize_url(uri)
u = @url_normalizer.call(uri)
if !u.is_a?(::LogStash::Util::SafeURI)
raise "URL Normalizer returned a '#{u.class}' rather than a SafeURI! This shouldn't happen!"
end
u
end
|
271
272
273
274
275
276
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 271
def perform_request(method, path, params={}, body=nil)
with_connection do |url|
resp = perform_request_to_url(url, method, path, params, body)
[url, resp]
end
end
|
284
285
286
287
288
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 284
def perform_request_to_url(url, method, path, params={}, body=nil)
res = @adapter.perform_request(url, method, path, params, body)
rescue *@adapter.host_unreachable_exceptions => e
raise HostUnreachableError.new(e, url), "Could not reach host #{e.class}: #{e.message}"
end
|
#remove_url(url) ⇒ Object
346
347
348
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 346
def remove_url(url)
@url_info.delete(url)
end
|
#resurrectionist_alive? ⇒ Boolean
267
268
269
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 267
def resurrectionist_alive?
@resurrectionist ? @resurrectionist.alive? : nil
end
|
#return_connection(url) ⇒ Object
427
428
429
430
431
432
433
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 427
def return_connection(url)
@state_mutex.synchronize do
if @url_info[url] @url_info[url][:in_use] -= 1
end
end
end
|
#size ⇒ Object
338
339
340
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 338
def size
@state_mutex.synchronize { @url_info.size }
end
|
#sniff! ⇒ Object
Sniffs the cluster then updates the internal URLs
151
152
153
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 151
def sniff!
update_urls(check_sniff)
end
|
#sniff_2x_1x(nodes) ⇒ Object
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 201
def sniff_2x_1x(nodes)
nodes.map do |id,info|
addr_str = info['http_address'].to_s
next unless addr_str
attributes = info['attributes']
next if attributes && attributes['data'] == 'false'
matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_SNIFF_RE_URL)
if matches
host = matches[1].empty? ? matches[2] : matches[1]
port = matches[3]
::LogStash::Util::SafeURI.new("#{host}:#{port}")
end
end.compact
end
|
#sniff_5x(nodes) ⇒ Object
193
194
195
196
197
198
199
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 193
def sniff_5x(nodes)
nodes.map do |id,info|
if info["http"]
uri = LogStash::Util::SafeURI.new(info["http"]["publish_address"])
end
end.compact
end
|
#sniffer_alive? ⇒ Boolean
229
230
231
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 229
def sniffer_alive?
@sniffer ? @sniffer.alive? : nil
end
|
#start ⇒ Object
71
72
73
74
75
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 71
def start
update_urls(@initial_urls)
start_resurrectionist
start_sniffer if @sniffing
end
|
#start_resurrectionist ⇒ Object
233
234
235
236
237
238
239
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 233
def start_resurrectionist
@resurrectionist = Thread.new do
until_stopped("resurrection", @resurrect_delay) do
healthcheck!
end
end
end
|
#start_sniffer ⇒ Object
137
138
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 137
def start_sniffer
@sniffer = Thread.new do
until_stopped("sniffing", sniffer_delay) do
begin
sniff!
rescue NoConnectionAvailableError => e
@state_mutex.synchronize { logger.warn("Elasticsearch output attempted to sniff for new connections but cannot. No living connections are detected. Pool contains the following current URLs", :url_info => @url_info) }
end
end
end
end
|
#stop_resurrectionist ⇒ Object
263
264
265
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 263
def stop_resurrectionist
@resurrectionist.join if @resurrectionist
end
|
#stop_sniffer ⇒ Object
225
226
227
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 225
def stop_sniffer
@sniffer.join if @sniffer
end
|
#until_stopped(task_name, delay) ⇒ Object
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 116
def until_stopped(task_name, delay)
last_done = Time.now
until @state_mutex.synchronize { @stopping }
begin
now = Time.now
if (now - last_done) >= delay
last_done = now
yield
end
sleep 1
rescue => e
logger.warn(
"Error while performing #{task_name}",
:error_message => e.message,
:class => e.class.name,
:backtrace => e.backtrace
)
end
end
end
|
#update_urls(new_urls) ⇒ Object
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 298
def update_urls(new_urls)
return if new_urls.nil?
new_urls = new_urls.map(&method(:normalize_url))
state_changes = {:removed => [], :added => []}
@state_mutex.synchronize do
new_urls.each do |url|
unless @url_info.keys.include?(url)
state_changes[:added] << url
add_url(url)
end
end
@url_info.each do |url,_|
unless new_urls.include?(url)
state_changes[:removed] << url
remove_url(url)
end
end
end
if state_changes[:removed].size > 0 || state_changes[:added].size > 0
if logger.info?
logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
end
end
healthcheck!
end
|
#url_info ⇒ Object
108
109
110
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 108
def url_info
@state_mutex.synchronize { @url_info }
end
|
393
394
395
396
397
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 393
def url_meta(url)
@state_mutex.synchronize do
@url_info[url]
end
end
|
#urls ⇒ Object
112
113
114
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 112
def urls
url_info.keys
end
|
#wait_for_in_use_connections ⇒ Object
93
94
95
96
97
98
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 93
def wait_for_in_use_connections
until in_use_connections.empty?
logger.info "Blocked on shutdown to in use connections #{@state_mutex.synchronize {@url_info}}"
sleep 1
end
end
|
#with_connection ⇒ Object
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 357
def with_connection
url, url_meta = get_connection
raise NoConnectionAvailableError, "No Available connections" unless url
yield url
rescue HostUnreachableError => e
mark_dead(url, e)
raise e
rescue BadResponseCodeError => e
raise e
rescue => e
logger.warn("UNEXPECTED POOL ERROR", :e => e)
raise e
ensure
return_connection(url)
end
|