Class: LogStash::Outputs::ElasticSearch::HttpClient::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/elasticsearch/http_client/pool.rb

Defined Under Namespace

Classes: BadResponseCodeError, HostUnreachableError, NoConnectionAvailableError

Constant Summary collapse

ROOT_URI_PATH =
'/'.freeze
DEFAULT_OPTIONS =
{
  :healthcheck_path => ROOT_URI_PATH,
  :absolute_healthcheck_path => false,
  :sniffing_path => '_nodes/http',
  :absolute_sniffing_path => false,
  :scheme => 'http',
  :resurrect_delay => 5,
  :sniffing => false,
  :sniffer_delay => 10,
}.freeze
ES1_SNIFF_RE_URL =
/\[([^\/]*)?\/?([^:]*):([0-9]+)\]/
ES2_SNIFF_RE_URL =
/([^\/]*)?\/?([^:]*):([0-9]+)/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool

Returns a new instance of Pool.

Raises:

  • (ArgumentError)


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 46

def initialize(logger, adapter, initial_urls=[], options={})
  @logger = logger
  @adapter = adapter
  @initial_urls = initial_urls
  
  raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
  @url_normalizer = options[:url_normalizer]
  DEFAULT_OPTIONS.merge(options).tap do |merged|
    @healthcheck_path = merged[:healthcheck_path]
    @absolute_healthcheck_path = merged[:absolute_healthcheck_path]
    @sniffing_path = merged[:sniffing_path]
    @absolute_sniffing_path = merged[:absolute_sniffing_path]
    @resurrect_delay = merged[:resurrect_delay]
    @sniffing = merged[:sniffing]
    @sniffer_delay = merged[:sniffer_delay]
  end

  # Used for all concurrent operations in this class
  @state_mutex = Mutex.new

  # Holds metadata about all URLs
  @url_info = {}
  @stopping = false
end

Instance Attribute Details

#absolute_healthcheck_pathObject (readonly)

Returns the value of attribute absolute_healthcheck_path.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def absolute_healthcheck_path
  @absolute_healthcheck_path
end

#absolute_sniffing_pathObject (readonly)

Returns the value of attribute absolute_sniffing_path.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def absolute_sniffing_path
  @absolute_sniffing_path
end

#adapterObject (readonly)

Returns the value of attribute adapter.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def adapter
  @adapter
end

#healthcheck_pathObject (readonly)

Returns the value of attribute healthcheck_path.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def healthcheck_path
  @healthcheck_path
end

#loggerObject (readonly)

Returns the value of attribute logger.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def logger
  @logger
end

#resurrect_delayObject (readonly)

Returns the value of attribute resurrect_delay.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def resurrect_delay
  @resurrect_delay
end

#sniffer_delayObject (readonly)

Returns the value of attribute sniffer_delay.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def sniffer_delay
  @sniffer_delay
end

#sniffingObject (readonly)

Returns the value of attribute sniffing.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def sniffing
  @sniffing
end

#sniffing_pathObject (readonly)

Returns the value of attribute sniffing_path.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def sniffing_path
  @sniffing_path
end

Instance Method Details

#add_url(url) ⇒ Object



342
343
344
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 342

def add_url(url)
  @url_info[url] ||= empty_url_meta
end

#alive_urls_countObject



104
105
106
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 104

def alive_urls_count
  @state_mutex.synchronize { @url_info.values.select {|v| !v[:state] == :alive }.count }
end

#check_sniffObject

Sniffs and returns the results. Does not update internal URLs!



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 158

def check_sniff
  resp = nil
  with_connection do |url|
    sniffing_url = url.clone
    sniffing_url.query = nil
    if @absolute_sniffing_path
      sniffing_url.path = ROOT_URI_PATH
    end
    resp = perform_request_to_url(sniffing_url, :get, @sniffing_path, {}, nil)
  end
  parsed = LogStash::Json.load(resp.body)

  nodes = parsed['nodes']
  if !nodes || nodes.empty?
    @logger.warn("Sniff returned no nodes! Will not update hosts.")
    return nil
  else
    case major_version(nodes)
    when 5
      sniff_5x(nodes)
    when 2
      sniff_2x_1x(nodes)
    when 1
      sniff_2x_1x(nodes)
    else
      @logger.warn("Could not determine version for nodes in ES cluster!")
      return nil
    end
  end
end

#closeObject



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 77

def close
  @state_mutex.synchronize { @stopping = true }

  logger.debug  "Stopping sniffer"
  stop_sniffer

  logger.debug  "Stopping resurrectionist"
  stop_resurrectionist

  logger.debug  "Waiting for in use manticore connections"
  wait_for_in_use_connections

  logger.debug("Closing adapter #{@adapter}")
  @adapter.close
end

#empty_url_metaObject



350
351
352
353
354
355
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 350

def empty_url_meta
  {
    :in_use => 0,
    :state => :unknown
  }
end

#get_connectionObject



399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 399

def get_connection
  @state_mutex.synchronize do
    # The goal here is to pick a random connection from the least-in-use connections
    # We want some randomness so that we don't hit the same node over and over, but
    # we also want more 'fair' behavior in the event of high concurrency
    eligible_set = nil
    lowest_value_seen = nil
    @url_info.each do |url,meta|
      meta_in_use = meta[:in_use]
      next if meta[:state] == :dead

      if lowest_value_seen.nil? || meta_in_use < lowest_value_seen
        lowest_value_seen = meta_in_use
        eligible_set = [[url, meta]]
      elsif lowest_value_seen == meta_in_use
        eligible_set << [url, meta]
      end
    end

    return nil if eligible_set.nil?

    pick, pick_meta = eligible_set.sample
    pick_meta[:in_use] += 1

    [pick, pick_meta]
  end
end

#healthcheck!Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 241

def healthcheck!
  # Try to keep locking granularity low such that we don't affect IO...
  @state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
    begin
      path = healthcheck_path
      healthcheck_url = LogStash::Util::SafeURI.new(url.uri.clone)
      healthcheck_url.query = nil
      if @absolute_healthcheck_path
        healthcheck_url.path = ROOT_URI_PATH
      end
      logger.info("Running health check to see if an Elasticsearch connection is working",
                    :healthcheck_url => healthcheck_url, :path => path)
      response = perform_request_to_url(healthcheck_url, :head, path)
      # If no exception was raised it must have succeeded!
      logger.warn("Restored connection to ES instance", :url => healthcheck_url.sanitized)
      @state_mutex.synchronize { meta[:state] = :alive }
    rescue HostUnreachableError, BadResponseCodeError => e
      logger.warn("Attempted to resurrect connection to dead ES instance, but got an error.", url: url.sanitized, error_type: e.class, error: e.message)
    end
  end
end

#in_use_connectionsObject



100
101
102
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 100

def in_use_connections
  @state_mutex.synchronize { @url_info.values.select {|v| v[:in_use] > 0 } }
end

#major_version(nodes) ⇒ Object



189
190
191
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 189

def major_version(nodes)
  k,v = nodes.first; v['version'].split('.').first.to_i
end

#mark_dead(url, error) ⇒ Object



379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 379

def mark_dead(url, error)
  @state_mutex.synchronize do
    meta = @url_info[url]
    # In case a sniff happened removing the metadata just before there's nothing to mark
    # This is an extreme edge case, but it can happen!
    return unless meta
    logger.warn("Marking url as dead. Last error: [#{error.class}] #{error.message}",
                :url => url, :error_message => error.message, :error_class => error.class.name)
    meta[:state] = :dead
    meta[:last_error] = error
    meta[:last_errored_at] = Time.now
  end
end

#normalize_url(uri) ⇒ Object



290
291
292
293
294
295
296
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 290

def normalize_url(uri)
  u = @url_normalizer.call(uri)
  if !u.is_a?(::LogStash::Util::SafeURI)
    raise "URL Normalizer returned a '#{u.class}' rather than a SafeURI! This shouldn't happen!"
  end
  u
end

#perform_request(method, path, params = {}, body = nil) ⇒ Object



271
272
273
274
275
276
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 271

def perform_request(method, path, params={}, body=nil)
  with_connection do |url|
    resp = perform_request_to_url(url, method, path, params, body)
    [url, resp]
  end
end

#perform_request_to_url(url, method, path, params = {}, body = nil) ⇒ Object



284
285
286
287
288
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 284

def perform_request_to_url(url, method, path, params={}, body=nil)
  res = @adapter.perform_request(url, method, path, params, body)
rescue *@adapter.host_unreachable_exceptions => e
  raise HostUnreachableError.new(e, url), "Could not reach host #{e.class}: #{e.message}"
end

#remove_url(url) ⇒ Object



346
347
348
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 346

def remove_url(url)
  @url_info.delete(url)
end

#resurrectionist_alive?Boolean

Returns:

  • (Boolean)


267
268
269
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 267

def resurrectionist_alive?
  @resurrectionist ? @resurrectionist.alive? : nil
end

#return_connection(url) ⇒ Object



427
428
429
430
431
432
433
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 427

def return_connection(url)
  @state_mutex.synchronize do
    if @url_info[url] # Guard against the condition where the connection has already been deleted
      @url_info[url][:in_use] -= 1
    end
  end
end

#sizeObject



338
339
340
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 338

def size
  @state_mutex.synchronize { @url_info.size }
end

#sniff!Object

Sniffs the cluster then updates the internal URLs



151
152
153
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 151

def sniff!
  update_urls(check_sniff)
end

#sniff_2x_1x(nodes) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 201

def sniff_2x_1x(nodes)
  nodes.map do |id,info|
    # TODO Make sure this works with shield. Does that listed
    # stuff as 'https_address?'
    
    addr_str = info['http_address'].to_s
    next unless addr_str # Skip hosts with HTTP disabled

    # Only connect to nodes that serve data
    # this will skip connecting to client, tribe, and master only nodes
    # Note that if 'attributes' is NOT set, then that's just a regular node
    # with master + data + client enabled, so we allow that
    attributes = info['attributes']
    next if attributes && attributes['data'] == 'false'

    matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_SNIFF_RE_URL)
    if matches
      host = matches[1].empty? ? matches[2] : matches[1]
      port = matches[3]
      ::LogStash::Util::SafeURI.new("#{host}:#{port}")
    end
  end.compact
end

#sniff_5x(nodes) ⇒ Object



193
194
195
196
197
198
199
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 193

def sniff_5x(nodes)
  nodes.map do |id,info|
    if info["http"]
      uri = LogStash::Util::SafeURI.new(info["http"]["publish_address"])
    end
  end.compact
end

#sniffer_alive?Boolean

Returns:

  • (Boolean)


229
230
231
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 229

def sniffer_alive?
  @sniffer ? @sniffer.alive? : nil
end

#startObject



71
72
73
74
75
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 71

def start
  update_urls(@initial_urls)
  start_resurrectionist
  start_sniffer if @sniffing
end

#start_resurrectionistObject



233
234
235
236
237
238
239
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 233

def start_resurrectionist
  @resurrectionist = Thread.new do
    until_stopped("resurrection", @resurrect_delay) do
      healthcheck!
    end
  end
end

#start_snifferObject



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 137

def start_sniffer
  @sniffer = Thread.new do
    until_stopped("sniffing", sniffer_delay) do
      begin
        sniff!
      rescue NoConnectionAvailableError => e
        @state_mutex.synchronize { # Synchronize around @url_info
          logger.warn("Elasticsearch output attempted to sniff for new connections but cannot. No living connections are detected. Pool contains the following current URLs", :url_info => @url_info) }
      end
    end
  end
end

#stop_resurrectionistObject



263
264
265
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 263

def stop_resurrectionist
  @resurrectionist.join if @resurrectionist
end

#stop_snifferObject



225
226
227
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 225

def stop_sniffer
  @sniffer.join if @sniffer
end

#until_stopped(task_name, delay) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 116

def until_stopped(task_name, delay)
  last_done = Time.now
  until @state_mutex.synchronize { @stopping }
    begin
      now = Time.now
      if (now - last_done) >= delay
        last_done = now
        yield
      end
      sleep 1
    rescue => e
      logger.warn(
        "Error while performing #{task_name}",
        :error_message => e.message,
        :class => e.class.name,
        :backtrace => e.backtrace
        )
    end
  end
end

#update_urls(new_urls) ⇒ Object



298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 298

def update_urls(new_urls)
  return if new_urls.nil?
  
  # Normalize URLs
  new_urls = new_urls.map(&method(:normalize_url))

  # Used for logging nicely
  state_changes = {:removed => [], :added => []}
  @state_mutex.synchronize do
    # Add new connections
    new_urls.each do |url|
      # URI objects don't have real hash equality! So, since this isn't perf sensitive we do a linear scan
      unless @url_info.keys.include?(url)
        state_changes[:added] << url
        add_url(url)
      end
    end

    # Delete connections not in the new list
    @url_info.each do |url,_|
      unless new_urls.include?(url)
        state_changes[:removed] << url
        remove_url(url)
      end
    end
  end

  if state_changes[:removed].size > 0 || state_changes[:added].size > 0
    if logger.info?
      logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
    end
  end
  
  # Run an inline healthcheck anytime URLs are updated
  # This guarantees that during startup / post-startup
  # sniffing we don't have idle periods waiting for the
  # periodic sniffer to allow new hosts to come online
  healthcheck! 
end

#url_infoObject



108
109
110
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 108

def url_info
  @state_mutex.synchronize { @url_info }
end

#url_meta(url) ⇒ Object



393
394
395
396
397
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 393

def url_meta(url)
  @state_mutex.synchronize do
    @url_info[url]
  end
end

#urlsObject



112
113
114
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 112

def urls
  url_info.keys
end

#wait_for_in_use_connectionsObject



93
94
95
96
97
98
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 93

def wait_for_in_use_connections
  until in_use_connections.empty?
    logger.info "Blocked on shutdown to in use connections #{@state_mutex.synchronize {@url_info}}"
    sleep 1
  end
end

#with_connectionObject



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 357

def with_connection
  url, url_meta = get_connection

  # Custom error class used here so that users may retry attempts if they receive this error
  # should they choose to
  raise NoConnectionAvailableError, "No Available connections" unless url
  yield url
rescue HostUnreachableError => e
  # Mark the connection as dead here since this is likely not transient
  mark_dead(url, e)
  raise e
rescue BadResponseCodeError => e
  # These aren't discarded from the pool because these are often very transient
  # errors
  raise e
rescue => e
  logger.warn("UNEXPECTED POOL ERROR", :e => e)
  raise e
ensure
  return_connection(url)
end