Class: LogStash::Outputs::OpenSearch::HttpClient::Pool
- Inherits:
-
Object
- Object
- LogStash::Outputs::OpenSearch::HttpClient::Pool
show all
- Defined in:
- lib/logstash/outputs/opensearch/http_client/pool.rb
Defined Under Namespace
Classes: BadResponseCodeError, HostUnreachableError, NoConnectionAvailableError
Constant Summary
collapse
- ROOT_URI_PATH =
'/'.freeze
- DEFAULT_OPTIONS =
{
:healthcheck_path => ROOT_URI_PATH,
:sniffing_path => "/_nodes/http",
:bulk_path => "/_bulk",
:scheme => 'http',
:resurrect_delay => 5,
:sniffing => false,
:sniffer_delay => 10,
}.freeze
- SNIFF_RE_URL =
/([^\/]*)?\/?([^:]*):([0-9]+)/
Instance Attribute Summary collapse
Instance Method Summary
collapse
-
#add_url(url) ⇒ Object
-
#address_str_to_uri(addr_str) ⇒ Object
-
#alive_urls_count ⇒ Object
-
#check_sniff ⇒ Object
Sniffs and returns the results.
-
#close ⇒ Object
-
#empty_url_meta ⇒ Object
-
#get_connection ⇒ Object
-
#get_version(url) ⇒ Object
-
#health_check_request(url) ⇒ Object
-
#healthcheck! ⇒ Object
-
#in_use_connections ⇒ Object
-
#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool
constructor
-
#last_version ⇒ Object
-
#major_version(version_string) ⇒ Object
-
#mark_dead(url, error) ⇒ Object
-
#maximum_seen_major_version ⇒ Object
-
#normalize_url(uri) ⇒ Object
-
#perform_request(method, path, params = {}, body = nil) ⇒ Object
-
#perform_request_to_url(url, method, path, params = {}, body = nil) ⇒ Object
-
#remove_url(url) ⇒ Object
-
#resurrectionist_alive? ⇒ Boolean
-
#return_connection(url) ⇒ Object
-
#size ⇒ Object
-
#sniff(nodes) ⇒ Object
-
#sniff! ⇒ Object
Sniffs the cluster then updates the internal URLs.
-
#sniffer_alive? ⇒ Boolean
-
#start ⇒ Object
-
#start_resurrectionist ⇒ Object
-
#start_sniffer ⇒ Object
-
#stop_resurrectionist ⇒ Object
-
#stop_sniffer ⇒ Object
-
#until_stopped(task_name, delay) ⇒ Object
-
#update_initial_urls ⇒ Object
-
#update_urls(new_urls) ⇒ Object
-
#url_info ⇒ Object
-
#url_meta(url) ⇒ Object
-
#urls ⇒ Object
-
#wait_for_in_use_connections ⇒ Object
-
#with_connection ⇒ Object
Constructor Details
#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool
Returns a new instance of Pool.
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 57
def initialize(logger, adapter, initial_urls=[], options={})
@logger = logger
@adapter = adapter
@metric = options[:metric]
@initial_urls = initial_urls
raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
@url_normalizer = options[:url_normalizer]
DEFAULT_OPTIONS.merge(options).tap do |merged|
@bulk_path = merged[:bulk_path]
@sniffing_path = merged[:sniffing_path]
@healthcheck_path = merged[:healthcheck_path]
@resurrect_delay = merged[:resurrect_delay]
@sniffing = merged[:sniffing]
@sniffer_delay = merged[:sniffer_delay]
@default_server_major_version = merged[:default_server_major_version]
end
@state_mutex = Mutex.new
@url_info = {}
@stopping = false
@last_version = Concurrent::AtomicReference.new
end
|
Instance Attribute Details
#adapter ⇒ Object
Returns the value of attribute adapter.
42
43
44
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 42
def adapter
@adapter
end
|
#bulk_path ⇒ Object
Returns the value of attribute bulk_path.
42
43
44
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 42
def bulk_path
@bulk_path
end
|
#default_server_major_version ⇒ Object
Returns the value of attribute default_server_major_version.
43
44
45
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 43
def default_server_major_version
@default_server_major_version
end
|
#healthcheck_path ⇒ Object
Returns the value of attribute healthcheck_path.
42
43
44
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 42
def healthcheck_path
@healthcheck_path
end
|
#logger ⇒ Object
Returns the value of attribute logger.
42
43
44
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 42
def logger
@logger
end
|
#resurrect_delay ⇒ Object
Returns the value of attribute resurrect_delay.
42
43
44
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 42
def resurrect_delay
@resurrect_delay
end
|
#sniffer_delay ⇒ Object
Returns the value of attribute sniffer_delay.
42
43
44
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 42
def sniffer_delay
@sniffer_delay
end
|
#sniffing ⇒ Object
Returns the value of attribute sniffing.
42
43
44
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 42
def sniffing
@sniffing
end
|
#sniffing_path ⇒ Object
Returns the value of attribute sniffing_path.
42
43
44
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 42
def sniffing_path
@sniffing_path
end
|
Instance Method Details
#add_url(url) ⇒ Object
327
328
329
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 327
def add_url(url)
@url_info[url] ||= empty_url_meta
end
|
#address_str_to_uri(addr_str) ⇒ Object
200
201
202
203
204
205
206
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 200
def address_str_to_uri(addr_str)
matches = addr_str.match(SNIFF_RE_URL)
if matches
host = matches[1].empty? ? matches[2] : matches[1]
::LogStash::Util::SafeURI.new("#{host}:#{matches[3]}")
end
end
|
#alive_urls_count ⇒ Object
122
123
124
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 122
def alive_urls_count
@state_mutex.synchronize { @url_info.values.select {|v| v[:state] == :alive }.count }
end
|
#check_sniff ⇒ Object
Sniffs and returns the results. Does not update internal URLs!
175
176
177
178
179
180
181
182
183
184
185
186
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 175
def check_sniff
_, url_meta, resp = perform_request(:get, @sniffing_path)
@metric.increment(:sniff_requests)
parsed = LogStash::Json.load(resp.body)
nodes = parsed['nodes']
if !nodes || nodes.empty?
@logger.warn("Sniff returned no nodes! Will not update hosts.")
return nil
else
sniff(nodes)
end
end
|
#close ⇒ Object
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 95
def close
@state_mutex.synchronize { @stopping = true }
logger.debug "Stopping sniffer"
stop_sniffer
logger.debug "Stopping resurrectionist"
stop_resurrectionist
logger.debug "Waiting for in use manticore connections"
wait_for_in_use_connections
logger.debug("Closing adapter #{@adapter}")
@adapter.close
end
|
335
336
337
338
339
340
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 335
def empty_url_meta
{
:in_use => 0,
:state => :unknown
}
end
|
#get_connection ⇒ Object
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 381
def get_connection
@state_mutex.synchronize do
eligible_set = nil
lowest_value_seen = nil
@url_info.each do |url,meta|
meta_in_use = meta[:in_use]
next if meta[:state] == :dead
if lowest_value_seen.nil? || meta_in_use < lowest_value_seen
lowest_value_seen = meta_in_use
eligible_set = [[url, meta]]
elsif lowest_value_seen == meta_in_use
eligible_set << [url, meta]
end
end
return nil if eligible_set.nil?
pick, pick_meta = eligible_set.sample
pick_meta[:in_use] += 1
[pick, pick_meta]
end
end
|
#get_version(url) ⇒ Object
416
417
418
419
420
421
422
423
424
425
426
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 416
def get_version(url)
response = perform_request_to_url(url, :get, ROOT_URI_PATH)
if response.code != 404 && !response.body.empty?
return LogStash::Json.load(response.body)["version"]["number"] end
if @default_server_major_version.nil?
@logger.error("Failed to get version from health_check endpoint and default_server_major_version is not configured.")
raise "get_version failed! no default_server_major_version configured."
end
"#{default_server_major_version}.0.0"
end
|
#health_check_request(url) ⇒ Object
224
225
226
227
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 224
def health_check_request(url)
logger.debug("Running health check to see if an OpenSearch connection is working", url: url.sanitized.to_s, path: @healthcheck_path)
perform_request_to_url(url, :head, @healthcheck_path)
end
|
#healthcheck! ⇒ Object
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 229
def healthcheck!
@state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
begin
health_check_request(url)
logger.warn("Restored connection to OpenSearch instance", url: url.sanitized.to_s)
version = get_version(url)
@state_mutex.synchronize do
meta[:version] = version
set_last_version(version, url)
meta[:state] = :alive
end
rescue HostUnreachableError, BadResponseCodeError => e
logger.warn("Attempted to resurrect connection to dead OpenSearch instance, but got an error", url: url.sanitized.to_s, exception: e.class, message: e.message)
end
end
end
|
#in_use_connections ⇒ Object
118
119
120
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 118
def in_use_connections
@state_mutex.synchronize { @url_info.values.select {|v| v[:in_use] > 0 } }
end
|
#last_version ⇒ Object
428
429
430
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 428
def last_version
@last_version.get
end
|
#major_version(version_string) ⇒ Object
188
189
190
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 188
def major_version(version_string)
version_string.split('.').first.to_i
end
|
#mark_dead(url, error) ⇒ Object
361
362
363
364
365
366
367
368
369
370
371
372
373
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 361
def mark_dead(url, error)
@state_mutex.synchronize do
meta = @url_info[url]
return unless meta
logger.warn("Marking url as dead. Last error: [#{error.class}] #{error.message}",
:url => url, :error_message => error.message, :error_class => error.class.name)
meta[:state] = :dead
meta[:last_error] = error
meta[:last_errored_at] = Time.now
end
end
|
#maximum_seen_major_version ⇒ Object
432
433
434
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 432
def maximum_seen_major_version
@state_mutex.synchronize { @maximum_seen_major_version }
end
|
#normalize_url(uri) ⇒ Object
277
278
279
280
281
282
283
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 277
def normalize_url(uri)
u = @url_normalizer.call(uri)
if !u.is_a?(::LogStash::Util::SafeURI)
raise "URL Normalizer returned a '#{u.class}' rather than a SafeURI! This shouldn't happen!"
end
u
end
|
257
258
259
260
261
262
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 257
def perform_request(method, path, params={}, body=nil)
with_connection do |url, url_meta|
resp = perform_request_to_url(url, method, path, params, body)
[url, url_meta, resp]
end
end
|
271
272
273
274
275
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 271
def perform_request_to_url(url, method, path, params={}, body=nil)
res = @adapter.perform_request(url, method, path, params, body)
rescue *@adapter.host_unreachable_exceptions => e
raise HostUnreachableError.new(e, url), "Could not reach host #{e.class}: #{e.message}"
end
|
#remove_url(url) ⇒ Object
331
332
333
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 331
def remove_url(url)
@url_info.delete(url)
end
|
#resurrectionist_alive? ⇒ Boolean
253
254
255
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 253
def resurrectionist_alive?
@resurrectionist ? @resurrectionist.alive? : nil
end
|
#return_connection(url) ⇒ Object
409
410
411
412
413
414
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 409
def return_connection(url)
@state_mutex.synchronize do
info = @url_info[url]
info[:in_use] -= 1 if info end
end
|
#size ⇒ Object
323
324
325
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 323
def size
@state_mutex.synchronize { @url_info.size }
end
|
#sniff(nodes) ⇒ Object
192
193
194
195
196
197
198
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 192
def sniff(nodes)
nodes.map do |id,info|
next if info["roles"] && info["roles"] == ["master"]
address_str_to_uri(info["http"]["publish_address"]) if info["http"]
end.compact
end
|
#sniff! ⇒ Object
Sniffs the cluster then updates the internal URLs
169
170
171
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 169
def sniff!
update_urls(check_sniff)
end
|
#sniffer_alive? ⇒ Boolean
212
213
214
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 212
def sniffer_alive?
@sniffer ? @sniffer.alive? : nil
end
|
#start ⇒ Object
85
86
87
88
89
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 85
def start
update_initial_urls
start_resurrectionist
start_sniffer if @sniffing
end
|
#start_resurrectionist ⇒ Object
216
217
218
219
220
221
222
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 216
def start_resurrectionist
@resurrectionist = Thread.new do
until_stopped("resurrection", @resurrect_delay) do
healthcheck!
end
end
end
|
#start_sniffer ⇒ Object
155
156
157
158
159
160
161
162
163
164
165
166
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 155
def start_sniffer
@sniffer = Thread.new do
until_stopped("sniffing", sniffer_delay) do
begin
sniff!
rescue NoConnectionAvailableError => e
@state_mutex.synchronize { logger.warn("OpenSearch output attempted to sniff for new connections but cannot. No living connections are detected. Pool contains the following current URLs", :url_info => @url_info) }
end
end
end
end
|
#stop_resurrectionist ⇒ Object
249
250
251
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 249
def stop_resurrectionist
@resurrectionist.join if @resurrectionist
end
|
#stop_sniffer ⇒ Object
208
209
210
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 208
def stop_sniffer
@sniffer.join if @sniffer
end
|
#until_stopped(task_name, delay) ⇒ Object
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 134
def until_stopped(task_name, delay)
last_done = Time.now
until @state_mutex.synchronize { @stopping }
begin
now = Time.now
if (now - last_done) >= delay
last_done = now
yield
end
sleep 1
rescue => e
logger.warn(
"Error while performing #{task_name}",
:error_message => e.message,
:class => e.class.name,
:backtrace => e.backtrace
)
end
end
end
|
#update_initial_urls ⇒ Object
91
92
93
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 91
def update_initial_urls
update_urls(@initial_urls)
end
|
#update_urls(new_urls) ⇒ Object
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 285
def update_urls(new_urls)
return if new_urls.nil?
new_urls = new_urls.map(&method(:normalize_url))
state_changes = {:removed => [], :added => []}
@state_mutex.synchronize do
new_urls.each do |url|
unless @url_info.keys.include?(url)
state_changes[:added] << url
add_url(url)
end
end
@url_info.each do |url,_|
unless new_urls.include?(url)
state_changes[:removed] << url
remove_url(url)
end
end
end
if state_changes[:removed].size > 0 || state_changes[:added].size > 0
logger.info? && logger.info("OpenSearch pool URLs updated", :changes => state_changes)
end
healthcheck!
end
|
#url_info ⇒ Object
126
127
128
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 126
def url_info
@state_mutex.synchronize { @url_info }
end
|
375
376
377
378
379
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 375
def url_meta(url)
@state_mutex.synchronize do
@url_info[url]
end
end
|
#urls ⇒ Object
130
131
132
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 130
def urls
url_info.keys
end
|
#wait_for_in_use_connections ⇒ Object
111
112
113
114
115
116
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 111
def wait_for_in_use_connections
until in_use_connections.empty?
logger.info "Blocked on shutdown to in use connections #{@state_mutex.synchronize {@url_info}}"
sleep 1
end
end
|
#with_connection ⇒ Object
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
|
# File 'lib/logstash/outputs/opensearch/http_client/pool.rb', line 342
def with_connection
url, url_meta = get_connection
raise NoConnectionAvailableError, "No Available connections" unless url
yield url, url_meta
rescue HostUnreachableError => e
mark_dead(url, e)
raise e
rescue BadResponseCodeError => e
raise e
ensure
return_connection(url)
end
|