Class: LogStash::Outputs::ElasticSearch::HttpClient::Pool
- Inherits:
-
Object
- Object
- LogStash::Outputs::ElasticSearch::HttpClient::Pool
show all
- Defined in:
- lib/logstash/outputs/elasticsearch/http_client/pool.rb
Defined Under Namespace
Classes: BadResponseCodeError, HostUnreachableError, NoConnectionAvailableError
Constant Summary
collapse
- DEFAULT_OPTIONS =
{
:healthcheck_path => '/'.freeze,
:scheme => 'http',
:resurrect_delay => 5,
:auth => nil, :sniffing => false,
:sniffer_delay => 10,
}.freeze
- ES1_SNIFF_RE_URL =
/\[([^\/]*)?\/?([^:]*):([0-9]+)\]/
- ES2_SNIFF_RE_URL =
/([^\/]*)?\/?([^:]*):([0-9]+)/
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_url(url) ⇒ Object
-
#alive_urls_count ⇒ Object
-
#check_sniff ⇒ Object
Sniffs and returns the results.
-
#close ⇒ Object
-
#empty_url_meta ⇒ Object
-
#get_connection ⇒ Object
-
#in_use_connections ⇒ Object
-
#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool
constructor
-
#mark_dead(url, error) ⇒ Object
-
#normalize_url(uri) ⇒ Object
-
#perform_request(method, path, params = {}, body = nil) ⇒ Object
-
#perform_request_to_url(url, method, path, params = {}, body = nil) ⇒ Object
-
#remove_url(url) ⇒ Object
-
#resurrect_dead! ⇒ Object
-
#resurrectionist_alive? ⇒ Boolean
-
#return_connection(url) ⇒ Object
-
#size ⇒ Object
-
#sniff! ⇒ Object
Sniffs the cluster then updates the internal URLs.
-
#sniffer_alive? ⇒ Boolean
-
#start_resurrectionist ⇒ Object
-
#start_sniffer ⇒ Object
-
#stop_resurrectionist ⇒ Object
-
#stop_sniffer ⇒ Object
-
#until_stopped(task_name, delay) ⇒ Object
-
#update_urls(new_urls) ⇒ Object
-
#url_info ⇒ Object
-
#url_meta(url) ⇒ Object
-
#urls ⇒ Object
-
#wait_for_in_use_connections ⇒ Object
-
#with_connection ⇒ Object
Constructor Details
#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool
Returns a new instance of Pool.
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 41
def initialize(logger, adapter, initial_urls=[], options={})
@logger = logger
@adapter = adapter
DEFAULT_OPTIONS.merge(options).tap do |merged|
@healthcheck_path = merged[:healthcheck_path]
@scheme = merged[:scheme]
@resurrect_delay = merged[:resurrect_delay]
@auth = merged[:auth]
@sniffing = merged[:sniffing]
@sniffer_delay = merged[:sniffer_delay]
end
if initial_urls.any? {|u| u.scheme == 'https'} && @scheme == 'http'
raise ArgumentError, "HTTP was set as scheme, but an HTTPS URL was passed in!"
end
@state_mutex = Mutex.new
@url_info = {}
@stopping = false
update_urls(initial_urls)
start_resurrectionist
start_sniffer if @sniffing
end
|
Instance Attribute Details
#adapter ⇒ Object
Returns the value of attribute adapter.
30
31
32
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30
def adapter
@adapter
end
|
#auth ⇒ Object
Returns the value of attribute auth.
30
31
32
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30
def auth
@auth
end
|
#healthcheck_path ⇒ Object
Returns the value of attribute healthcheck_path.
30
31
32
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30
def healthcheck_path
@healthcheck_path
end
|
#logger ⇒ Object
Returns the value of attribute logger.
30
31
32
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30
def logger
@logger
end
|
#resurrect_delay ⇒ Object
Returns the value of attribute resurrect_delay.
30
31
32
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30
def resurrect_delay
@resurrect_delay
end
|
#sniffer_delay ⇒ Object
Returns the value of attribute sniffer_delay.
30
31
32
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30
def sniffer_delay
@sniffer_delay
end
|
#sniffing ⇒ Object
Returns the value of attribute sniffing.
30
31
32
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30
def sniffing
@sniffing
end
|
Instance Method Details
#add_url(url) ⇒ Object
287
288
289
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 287
def add_url(url)
@url_info[url] ||= empty_url_meta
end
|
#alive_urls_count ⇒ Object
98
99
100
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 98
def alive_urls_count
@state_mutex.synchronize { @url_info.values.select {|v| !v[:dead] }.count }
end
|
#check_sniff ⇒ Object
Sniffs and returns the results. Does not update internal URLs!
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 152
def check_sniff
url, resp = perform_request(:get, '_nodes')
parsed = LogStash::Json.load(resp.body)
parsed['nodes'].map do |id,info|
addr_str = info['http_address'].to_s
next unless addr_str
attributes = info['attributes']
next if attributes && attributes['data'] == 'false'
matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_SNIFF_RE_URL)
if matches
host = matches[1].empty? ? matches[2] : matches[1]
port = matches[3]
URI.parse("#{@scheme}://#{host}:#{port}")
end
end.compact
end
|
#close ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 71
def close
@state_mutex.synchronize { @stopping = true }
logger.debug "Stopping sniffer"
stop_sniffer
logger.debug "Stopping resurrectionist"
stop_resurrectionist
logger.debug "Waiting for in use manticore connections"
wait_for_in_use_connections
logger.debug("Closing adapter #{@adapter}")
@adapter.close
end
|
295
296
297
298
299
300
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 295
def empty_url_meta
{
:in_use => 0,
:dead => false
}
end
|
#get_connection ⇒ Object
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 345
def get_connection
@state_mutex.synchronize do
eligible_set = nil
lowest_value_seen = nil
@url_info.each do |url,meta|
meta_in_use = meta[:in_use]
next if meta[:dead]
if lowest_value_seen.nil? || meta_in_use < lowest_value_seen
lowest_value_seen = meta_in_use
eligible_set = [[url, meta]]
elsif lowest_value_seen == meta_in_use
eligible_set << [url, meta]
end
end
return nil if eligible_set.nil?
pick, pick_meta = eligible_set.sample
pick_meta[:in_use] += 1
[pick, pick_meta]
end
end
|
#in_use_connections ⇒ Object
94
95
96
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 94
def in_use_connections
@state_mutex.synchronize { @url_info.values.select {|v| v[:in_use] > 0 } }
end
|
#mark_dead(url, error) ⇒ Object
324
325
326
327
328
329
330
331
332
333
334
335
336
337
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 324
def mark_dead(url, error)
@state_mutex.synchronize do
meta = @url_info[url]
return unless meta
safe_url = ::LogStash::Outputs::ElasticSearch::SafeURL.without_credentials(url)
logger.warn("Marking url as dead.", :reason => error.message, :url => safe_url,
:error_message => error.message, :error_class => error.class.name)
meta[:dead] = true
meta[:last_error] = error
meta[:last_errored_at] = Time.now
end
end
|
#normalize_url(uri) ⇒ Object
238
239
240
241
242
243
244
245
246
247
248
249
250
251
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 238
def normalize_url(uri)
raise ArgumentError, "Only URI objects may be passed in!" unless uri.is_a?(URI)
uri = uri.clone
if @auth && !uri.user
uri.user ||= @auth[:user]
uri.password ||= @auth[:password]
end
uri.scheme = @scheme
uri
end
|
219
220
221
222
223
224
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 219
def perform_request(method, path, params={}, body=nil)
with_connection do |url|
resp = perform_request_to_url(url, method, path, params, body)
[url, resp]
end
end
|
232
233
234
235
236
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 232
def perform_request_to_url(url, method, path, params={}, body=nil)
res = @adapter.perform_request(url, method, path, params, body)
rescue *@adapter.host_unreachable_exceptions => e
raise HostUnreachableError.new(e, url), "Could not reach host #{e.class}: #{e.message}"
end
|
#remove_url(url) ⇒ Object
291
292
293
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 291
def remove_url(url)
@url_info.delete(url)
end
|
#resurrect_dead! ⇒ Object
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 194
def resurrect_dead!
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:dead] } }.each do |url,meta|
safe_url = ::LogStash::Outputs::ElasticSearch::SafeURL.without_credentials(url)
begin
logger.info("Running health check to see if an Elasticsearch connection is working",
url: safe_url, healthcheck_path: @healthcheck_path)
perform_request_to_url(url, "HEAD", @healthcheck_path)
logger.warn("Restored connection to ES instance", :url => safe_url)
@state_mutex.synchronize { meta[:dead] = false }
rescue HostUnreachableError => e
logger.debug("Attempted to resurrect connection to dead ES instance, but got an error.", url: safe_url, error_type: e.class, error: e.message)
end
end
end
|
#resurrectionist_alive? ⇒ Boolean
215
216
217
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 215
def resurrectionist_alive?
@resurrectionist.alive?
end
|
#return_connection(url) ⇒ Object
373
374
375
376
377
378
379
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 373
def return_connection(url)
@state_mutex.synchronize do
if @url_info[url] @url_info[url][:in_use] -= 1
end
end
end
|
#size ⇒ Object
283
284
285
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 283
def size
@state_mutex.synchronize { @url_info.size }
end
|
#sniff! ⇒ Object
Sniffs the cluster then updates the internal URLs
145
146
147
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 145
def sniff!
update_urls(check_sniff)
end
|
#sniffer_alive? ⇒ Boolean
182
183
184
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 182
def sniffer_alive?
@sniffer ? @sniffer.alive? : nil
end
|
#start_resurrectionist ⇒ Object
186
187
188
189
190
191
192
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 186
def start_resurrectionist
@resurrectionist = Thread.new do
until_stopped("resurrection", @resurrect_delay) do
resurrect_dead!
end
end
end
|
#start_sniffer ⇒ Object
131
132
133
134
135
136
137
138
139
140
141
142
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 131
def start_sniffer
@sniffer = Thread.new do
until_stopped("sniffing", sniffer_delay) do
begin
sniff!
rescue NoConnectionAvailableError => e
@state_mutex.synchronize { logger.warn("Elasticsearch output attempted to sniff for new connections but cannot. No living connections are detected. Pool contains the following current URLs", :url_info => @url_info) }
end
end
end
end
|
#stop_resurrectionist ⇒ Object
211
212
213
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 211
def stop_resurrectionist
@resurrectionist.join
end
|
#stop_sniffer ⇒ Object
178
179
180
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 178
def stop_sniffer
@sniffer.join if @sniffer
end
|
#until_stopped(task_name, delay) ⇒ Object
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 110
def until_stopped(task_name, delay)
last_done = Time.now
until @state_mutex.synchronize { @stopping }
begin
now = Time.now
if (now - last_done) >= delay
last_done = now
yield
end
sleep 1
rescue => e
logger.warn(
"Error while performing #{task_name}",
:error_message => e.message,
:class => e.class.name,
:backtrace => e.backtrace
)
end
end
end
|
#update_urls(new_urls) ⇒ Object
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 253
def update_urls(new_urls)
new_urls = new_urls.map(&method(:normalize_url))
state_changes = {:removed => [], :added => []}
@state_mutex.synchronize do
new_urls.each do |url|
unless @url_info.keys.include?(url)
state_changes[:added] << url.to_s
add_url(url)
end
end
@url_info.each do |url,_|
unless new_urls.include?(url)
state_changes[:removed] << url.to_s
remove_url(url)
end
end
end
if state_changes[:removed].size > 0 || state_changes[:added].size > 0
logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
end
end
|
#url_info ⇒ Object
102
103
104
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 102
def url_info
@state_mutex.synchronize { @url_info }
end
|
339
340
341
342
343
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 339
def url_meta(url)
@state_mutex.synchronize do
@url_info[url]
end
end
|
#urls ⇒ Object
106
107
108
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 106
def urls
url_info.keys
end
|
#wait_for_in_use_connections ⇒ Object
87
88
89
90
91
92
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 87
def wait_for_in_use_connections
until in_use_connections.empty?
logger.info "Blocked on shutdown to in use connections #{@state_mutex.synchronize {@url_info}}"
sleep 1
end
end
|
#with_connection ⇒ Object
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
|
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 302
def with_connection
url, url_meta = get_connection
raise NoConnectionAvailableError, "No Available connections" unless url
yield url
rescue HostUnreachableError => e
mark_dead(url, e)
raise e
rescue BadResponseCodeError => e
raise e
rescue => e
logger.warn("UNEXPECTED POOL ERROR", :e => e)
raise e
ensure
return_connection(url)
end
|