Class: LogStash::Outputs::ElasticSearch::HttpClient::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/elasticsearch/http_client/pool.rb

Defined Under Namespace

Classes: BadResponseCodeError, HostUnreachableError, NoConnectionAvailableError

Constant Summary collapse

ROOT_URI_PATH =
'/'.freeze
DEFAULT_OPTIONS =
{
  :healthcheck_path => ROOT_URI_PATH,
  :sniffing_path => "/_nodes/http",
  :bulk_path => "/_bulk",
  :scheme => 'http',
  :resurrect_delay => 5,
  :sniffing => false,
  :sniffer_delay => 10,
}.freeze
ES1_SNIFF_RE_URL =
/\[([^\/]*)?\/?([^:]*):([0-9]+)\]/
ES2_SNIFF_RE_URL =
/([^\/]*)?\/?([^:]*):([0-9]+)/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool

Returns a new instance of Pool.

Raises:

  • (ArgumentError)


45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 45

def initialize(logger, adapter, initial_urls=[], options={})
  @logger = logger
  @adapter = adapter
  @metric = options[:metric]
  @initial_urls = initial_urls
  
  raise ArgumentError, "No URL Normalizer specified!" unless options[:url_normalizer]
  @url_normalizer = options[:url_normalizer]
  DEFAULT_OPTIONS.merge(options).tap do |merged|
    @bulk_path = merged[:bulk_path]
    @sniffing_path = merged[:sniffing_path]
    @healthcheck_path = merged[:healthcheck_path]
    @resurrect_delay = merged[:resurrect_delay]
    @sniffing = merged[:sniffing]
    @sniffer_delay = merged[:sniffer_delay]
  end

  # Used for all concurrent operations in this class
  @state_mutex = Mutex.new

  # Holds metadata about all URLs
  @url_info = {}
  @stopping = false
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def adapter
  @adapter
end

#bulk_pathObject (readonly)

Returns the value of attribute bulk_path.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def bulk_path
  @bulk_path
end

#healthcheck_pathObject (readonly)

Returns the value of attribute healthcheck_path.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def healthcheck_path
  @healthcheck_path
end

#loggerObject (readonly)

Returns the value of attribute logger.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def logger
  @logger
end

#resurrect_delayObject (readonly)

Returns the value of attribute resurrect_delay.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def resurrect_delay
  @resurrect_delay
end

#sniffer_delayObject (readonly)

Returns the value of attribute sniffer_delay.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def sniffer_delay
  @sniffer_delay
end

#sniffingObject (readonly)

Returns the value of attribute sniffing.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def sniffing
  @sniffing
end

#sniffing_pathObject (readonly)

Returns the value of attribute sniffing_path.



31
32
33
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 31

def sniffing_path
  @sniffing_path
end

Instance Method Details

#add_url(url) ⇒ Object



352
353
354
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 352

def add_url(url)
  @url_info[url] ||= empty_url_meta
end

#alive_urls_countObject



103
104
105
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 103

def alive_urls_count
  @state_mutex.synchronize { @url_info.values.select {|v| !v[:state] == :alive }.count }
end

#check_sniffObject

Sniffs and returns the results. Does not update internal URLs!



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 163

def check_sniff
  _, url_meta, resp = perform_request(:get, @sniffing_path)
  @metric.increment(:sniff_requests)
  parsed = LogStash::Json.load(resp.body)
  nodes = parsed['nodes']
  if !nodes || nodes.empty?
    @logger.warn("Sniff returned no nodes! Will not update hosts.")
    return nil
  else
    case major_version(url_meta[:version])
    when 5, 6
      sniff_5x_and_above(nodes)
    when 2, 1
      sniff_2x_1x(nodes)
    else
      @logger.warn("Could not determine version for nodes in ES cluster!")
      return nil
    end
  end
end

#closeObject



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 76

def close
  @state_mutex.synchronize { @stopping = true }

  logger.debug  "Stopping sniffer"
  stop_sniffer

  logger.debug  "Stopping resurrectionist"
  stop_resurrectionist

  logger.debug  "Waiting for in use manticore connections"
  wait_for_in_use_connections

  logger.debug("Closing adapter #{@adapter}")
  @adapter.close
end

#empty_url_metaObject



360
361
362
363
364
365
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 360

def empty_url_meta
  {
    :in_use => 0,
    :state => :unknown
  }
end

#es_versionsObject



348
349
350
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 348

def es_versions
  @state_mutex.synchronize { @url_info.size }
end

#get_connectionObject



409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 409

def get_connection
  @state_mutex.synchronize do
    # The goal here is to pick a random connection from the least-in-use connections
    # We want some randomness so that we don't hit the same node over and over, but
    # we also want more 'fair' behavior in the event of high concurrency
    eligible_set = nil
    lowest_value_seen = nil
    @url_info.each do |url,meta|
      meta_in_use = meta[:in_use]
      next if meta[:state] == :dead

      if lowest_value_seen.nil? || meta_in_use < lowest_value_seen
        lowest_value_seen = meta_in_use
        eligible_set = [[url, meta]]
      elsif lowest_value_seen == meta_in_use
        eligible_set << [url, meta]
      end
    end

    return nil if eligible_set.nil?

    pick, pick_meta = eligible_set.sample
    pick_meta[:in_use] += 1

    [pick, pick_meta]
  end
end

#get_es_version(url) ⇒ Object



445
446
447
448
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 445

def get_es_version(url)
  request = perform_request_to_url(url, :get, ROOT_URI_PATH)
  LogStash::Json.load(request.body)["version"]["number"]
end

#healthcheck!Object



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 239

def healthcheck!
  # Try to keep locking granularity low such that we don't affect IO...
  @state_mutex.synchronize { @url_info.select {|url,meta| meta[:state] != :alive } }.each do |url,meta|
    begin
      logger.info("Running health check to see if an Elasticsearch connection is working",
                    :healthcheck_url => url, :path => @healthcheck_path)
      response = perform_request_to_url(url, :head, @healthcheck_path)
      # If no exception was raised it must have succeeded!
      logger.warn("Restored connection to ES instance", :url => url.sanitized.to_s)
      # We reconnected to this node, check its ES version
      es_version = get_es_version(url)
      @state_mutex.synchronize do
        meta[:version] = es_version
        major = major_version(es_version)
        if !@maximum_seen_major_version
          @logger.info("ES Output version determined", :es_version => major)
          set_new_major_version(major)
        elsif major > @maximum_seen_major_version
          @logger.warn("Detected a node with a higher major version than previously observed. This could be the result of an elasticsearch cluster upgrade.", :previous_major => @maximum_seen_major_version, :new_major => major, :node_url => url)
          set_new_major_version(major)
        end
        meta[:state] = :alive
      end
    rescue HostUnreachableError, BadResponseCodeError => e
      logger.warn("Attempted to resurrect connection to dead ES instance, but got an error.", url: url.sanitized.to_s, error_type: e.class, error: e.message)
    end
  end
end

#in_use_connectionsObject



99
100
101
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 99

def in_use_connections
  @state_mutex.synchronize { @url_info.values.select {|v| v[:in_use] > 0 } }
end

#major_version(version_string) ⇒ Object



184
185
186
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 184

def major_version(version_string)
  version_string.split('.').first.to_i
end

#mark_dead(url, error) ⇒ Object



389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 389

def mark_dead(url, error)
  @state_mutex.synchronize do
    meta = @url_info[url]
    # In case a sniff happened removing the metadata just before there's nothing to mark
    # This is an extreme edge case, but it can happen!
    return unless meta
    logger.warn("Marking url as dead. Last error: [#{error.class}] #{error.message}",
                :url => url, :error_message => error.message, :error_class => error.class.name)
    meta[:state] = :dead
    meta[:last_error] = error
    meta[:last_errored_at] = Time.now
  end
end

#maximum_seen_major_versionObject



111
112
113
114
115
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 111

def maximum_seen_major_version
  @state_mutex.synchronize do
    @maximum_seen_major_version
  end
end

#normalize_url(uri) ⇒ Object



296
297
298
299
300
301
302
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 296

def normalize_url(uri)
  u = @url_normalizer.call(uri)
  if !u.is_a?(::LogStash::Util::SafeURI)
    raise "URL Normalizer returned a '#{u.class}' rather than a SafeURI! This shouldn't happen!"
  end
  u
end

#perform_request(method, path, params = {}, body = nil) ⇒ Object



276
277
278
279
280
281
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 276

def perform_request(method, path, params={}, body=nil)
  with_connection do |url, url_meta|
    resp = perform_request_to_url(url, method, path, params, body)
    [url, url_meta, resp]
  end
end

#perform_request_to_url(url, method, path, params = {}, body = nil) ⇒ Object



290
291
292
293
294
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 290

def perform_request_to_url(url, method, path, params={}, body=nil)
  res = @adapter.perform_request(url, method, path, params, body)
rescue *@adapter.host_unreachable_exceptions => e
  raise HostUnreachableError.new(e, url), "Could not reach host #{e.class}: #{e.message}"
end

#remove_url(url) ⇒ Object



356
357
358
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 356

def remove_url(url)
  @url_info.delete(url)
end

#resurrectionist_alive?Boolean

Returns:

  • (Boolean)


272
273
274
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 272

def resurrectionist_alive?
  @resurrectionist ? @resurrectionist.alive? : nil
end

#return_connection(url) ⇒ Object



437
438
439
440
441
442
443
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 437

def return_connection(url)
  @state_mutex.synchronize do
    if @url_info[url] # Guard against the condition where the connection has already been deleted
      @url_info[url][:in_use] -= 1
    end
  end
end

#set_new_major_version(version) ⇒ Object



450
451
452
453
454
455
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 450

def set_new_major_version(version)
  @maximum_seen_major_version = version
  if @maximum_seen_major_version >= 6
    @logger.warn("Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type", :es_version => @maximum_seen_major_version)
  end
end

#sizeObject



344
345
346
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 344

def size
  @state_mutex.synchronize { @url_info.size }
end

#sniff!Object

Sniffs the cluster then updates the internal URLs



156
157
158
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 156

def sniff!
  update_urls(check_sniff)
end

#sniff_2x_1x(nodes) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 199

def sniff_2x_1x(nodes)
  nodes.map do |id,info|
    # TODO Make sure this works with shield. Does that listed
    # stuff as 'https_address?'
    
    addr_str = info['http_address'].to_s
    next unless addr_str # Skip hosts with HTTP disabled

    # Only connect to nodes that serve data
    # this will skip connecting to client, tribe, and master only nodes
    # Note that if 'attributes' is NOT set, then that's just a regular node
    # with master + data + client enabled, so we allow that
    attributes = info['attributes']
    next if attributes && attributes['data'] == 'false'

    matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_SNIFF_RE_URL)
    if matches
      host = matches[1].empty? ? matches[2] : matches[1]
      port = matches[3]
      ::LogStash::Util::SafeURI.new("#{host}:#{port}")
    end
  end.compact
end

#sniff_5x_and_above(nodes) ⇒ Object



188
189
190
191
192
193
194
195
196
197
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 188

def sniff_5x_and_above(nodes)
  nodes.map do |id,info|
    # Skip master-only nodes
    next if info["roles"] && info["roles"] == ["master"]

    if info["http"]
      uri = LogStash::Util::SafeURI.new(info["http"]["publish_address"])
    end
  end.compact
end

#sniffer_alive?Boolean

Returns:

  • (Boolean)


227
228
229
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 227

def sniffer_alive?
  @sniffer ? @sniffer.alive? : nil
end

#startObject



70
71
72
73
74
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 70

def start
  update_urls(@initial_urls)
  start_resurrectionist
  start_sniffer if @sniffing
end

#start_resurrectionistObject



231
232
233
234
235
236
237
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 231

def start_resurrectionist
  @resurrectionist = Thread.new do
    until_stopped("resurrection", @resurrect_delay) do
      healthcheck!
    end
  end
end

#start_snifferObject



142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 142

def start_sniffer
  @sniffer = Thread.new do
    until_stopped("sniffing", sniffer_delay) do
      begin
        sniff!
      rescue NoConnectionAvailableError => e
        @state_mutex.synchronize { # Synchronize around @url_info
          logger.warn("Elasticsearch output attempted to sniff for new connections but cannot. No living connections are detected. Pool contains the following current URLs", :url_info => @url_info) }
      end
    end
  end
end

#stop_resurrectionistObject



268
269
270
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 268

def stop_resurrectionist
  @resurrectionist.join if @resurrectionist
end

#stop_snifferObject



223
224
225
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 223

def stop_sniffer
  @sniffer.join if @sniffer
end

#until_stopped(task_name, delay) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 121

def until_stopped(task_name, delay)
  last_done = Time.now
  until @state_mutex.synchronize { @stopping }
    begin
      now = Time.now
      if (now - last_done) >= delay
        last_done = now
        yield
      end
      sleep 1
    rescue => e
      logger.warn(
        "Error while performing #{task_name}",
        :error_message => e.message,
        :class => e.class.name,
        :backtrace => e.backtrace
        )
    end
  end
end

#update_urls(new_urls) ⇒ Object



304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 304

def update_urls(new_urls)
  return if new_urls.nil?
  
  # Normalize URLs
  new_urls = new_urls.map(&method(:normalize_url))

  # Used for logging nicely
  state_changes = {:removed => [], :added => []}
  @state_mutex.synchronize do
    # Add new connections
    new_urls.each do |url|
      # URI objects don't have real hash equality! So, since this isn't perf sensitive we do a linear scan
      unless @url_info.keys.include?(url)
        state_changes[:added] << url
        add_url(url)
      end
    end

    # Delete connections not in the new list
    @url_info.each do |url,_|
      unless new_urls.include?(url)
        state_changes[:removed] << url
        remove_url(url)
      end
    end
  end

  if state_changes[:removed].size > 0 || state_changes[:added].size > 0
    if logger.info?
      logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
    end
  end
  
  # Run an inline healthcheck anytime URLs are updated
  # This guarantees that during startup / post-startup
  # sniffing we don't have idle periods waiting for the
  # periodic sniffer to allow new hosts to come online
  healthcheck! 
end

#url_infoObject



107
108
109
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 107

def url_info
  @state_mutex.synchronize { @url_info }
end

#url_meta(url) ⇒ Object



403
404
405
406
407
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 403

def url_meta(url)
  @state_mutex.synchronize do
    @url_info[url]
  end
end

#urlsObject



117
118
119
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 117

def urls
  url_info.keys
end

#wait_for_in_use_connectionsObject



92
93
94
95
96
97
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 92

def wait_for_in_use_connections
  until in_use_connections.empty?
    logger.info "Blocked on shutdown to in use connections #{@state_mutex.synchronize {@url_info}}"
    sleep 1
  end
end

#with_connectionObject



367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 367

def with_connection
  url, url_meta = get_connection

  # Custom error class used here so that users may retry attempts if they receive this error
  # should they choose to
  raise NoConnectionAvailableError, "No Available connections" unless url
  yield url, url_meta
rescue HostUnreachableError => e
  # Mark the connection as dead here since this is likely not transient
  mark_dead(url, e)
  raise e
rescue BadResponseCodeError => e
  # These aren't discarded from the pool because these are often very transient
  # errors
  raise e
rescue => e
  logger.warn("UNEXPECTED POOL ERROR", :e => e)
  raise e
ensure
  return_connection(url)
end