Class: LogStash::Outputs::ElasticSearch::HttpClient::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/elasticsearch/http_client/pool.rb

Defined Under Namespace

Classes: BadResponseCodeError, HostUnreachableError, NoConnectionAvailableError

Constant Summary collapse

DEFAULT_OPTIONS =
{
  :healthcheck_path => '/'.freeze,
  :scheme => 'http',
  :resurrect_delay => 5,
  :auth => nil, # Can be set to {:user => 'user', :password => 'pass'}
  :sniffing => false,
  :sniffer_delay => 10,
}.freeze
ES1_SNIFF_RE_URL =
/\[([^\/]*)?\/?([^:]*):([0-9]+)\]/
ES2_SNIFF_RE_URL =
/([^\/]*)?\/?([^:]*):([0-9]+)/

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(logger, adapter, initial_urls = [], options = {}) ⇒ Pool

Returns a new instance of Pool.



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 41

def initialize(logger, adapter, initial_urls=[], options={})
  @logger = logger
  @adapter = adapter

  DEFAULT_OPTIONS.merge(options).tap do |merged|
    @healthcheck_path = merged[:healthcheck_path]
    @scheme = merged[:scheme]
    @resurrect_delay = merged[:resurrect_delay]
    @auth = merged[:auth]
    @sniffing = merged[:sniffing]
    @sniffer_delay = merged[:sniffer_delay]
  end

  # Override the scheme if one is explicitly set in urls
  if initial_urls.any? {|u| u.scheme == 'https'} && @scheme == 'http'
    raise ArgumentError, "HTTP was set as scheme, but an HTTPS URL was passed in!"
  end

  # Used for all concurrent operations in this class
  @state_mutex = Mutex.new

  # Holds metadata about all URLs
  @url_info = {}
  @stopping = false

  update_urls(initial_urls)
  start_resurrectionist
  start_sniffer if @sniffing
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



30
31
32
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30

def adapter
  @adapter
end

#authObject (readonly)

Returns the value of attribute auth.



30
31
32
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30

def auth
  @auth
end

#healthcheck_pathObject (readonly)

Returns the value of attribute healthcheck_path.



30
31
32
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30

def healthcheck_path
  @healthcheck_path
end

#loggerObject (readonly)

Returns the value of attribute logger.



30
31
32
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30

def logger
  @logger
end

#resurrect_delayObject (readonly)

Returns the value of attribute resurrect_delay.



30
31
32
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30

def resurrect_delay
  @resurrect_delay
end

#sniffer_delayObject (readonly)

Returns the value of attribute sniffer_delay.



30
31
32
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30

def sniffer_delay
  @sniffer_delay
end

#sniffingObject (readonly)

Returns the value of attribute sniffing.



30
31
32
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 30

def sniffing
  @sniffing
end

Instance Method Details

#add_url(url) ⇒ Object



287
288
289
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 287

def add_url(url)
  @url_info[url] ||= empty_url_meta
end

#alive_urls_countObject



98
99
100
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 98

def alive_urls_count
  @state_mutex.synchronize { @url_info.values.select {|v| !v[:dead] }.count }
end

#check_sniffObject

Sniffs and returns the results. Does not update internal URLs!



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 152

def check_sniff
  url, resp = perform_request(:get, '_nodes')
  parsed = LogStash::Json.load(resp.body)
  parsed['nodes'].map do |id,info|
    # TODO Make sure this works with shield. Does that listed
    # stuff as 'https_address?'
    addr_str = info['http_address'].to_s
    next unless addr_str # Skip hosts with HTTP disabled


    # Only connect to nodes that serve data
    # this will skip connecting to client, tribe, and master only nodes
    # Note that if 'attributes' is NOT set, then that's just a regular node
    # with master + data + client enabled, so we allow that
    attributes = info['attributes']
    next if attributes && attributes['data'] == 'false'

    matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_SNIFF_RE_URL)
    if matches
      host = matches[1].empty? ? matches[2] : matches[1]
      port = matches[3]
      URI.parse("#{@scheme}://#{host}:#{port}")
    end
  end.compact
end

#closeObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 71

def close
  @state_mutex.synchronize { @stopping = true }

  logger.debug  "Stopping sniffer"
  stop_sniffer

  logger.debug  "Stopping resurrectionist"
  stop_resurrectionist

  logger.debug  "Waiting for in use manticore connections"
  wait_for_in_use_connections

  logger.debug("Closing adapter #{@adapter}")
  @adapter.close
end

#empty_url_metaObject



295
296
297
298
299
300
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 295

def empty_url_meta
  {
    :in_use => 0,
    :dead => false
  }
end

#get_connectionObject



345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 345

def get_connection
  @state_mutex.synchronize do
    # The goal here is to pick a random connection from the least-in-use connections
    # We want some randomness so that we don't hit the same node over and over, but
    # we also want more 'fair' behavior in the event of high concurrency
    eligible_set = nil
    lowest_value_seen = nil
    @url_info.each do |url,meta|
      meta_in_use = meta[:in_use]
      next if meta[:dead]

      if lowest_value_seen.nil? || meta_in_use < lowest_value_seen
        lowest_value_seen = meta_in_use
        eligible_set = [[url, meta]]
      elsif lowest_value_seen == meta_in_use
        eligible_set << [url, meta]
      end
    end

    return nil if eligible_set.nil?

    pick, pick_meta = eligible_set.sample
    pick_meta[:in_use] += 1

    [pick, pick_meta]
  end
end

#in_use_connectionsObject



94
95
96
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 94

def in_use_connections
  @state_mutex.synchronize { @url_info.values.select {|v| v[:in_use] > 0 } }
end

#mark_dead(url, error) ⇒ Object



324
325
326
327
328
329
330
331
332
333
334
335
336
337
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 324

def mark_dead(url, error)
  @state_mutex.synchronize do
    meta = @url_info[url]
    # In case a sniff happened removing the metadata just before there's nothing to mark
    # This is an extreme edge case, but it can happen!
    return unless meta 
    safe_url = ::LogStash::Outputs::ElasticSearch::SafeURL.without_credentials(url)
    logger.warn("Marking url as dead.", :reason => error.message, :url => safe_url,
                :error_message => error.message, :error_class => error.class.name)
    meta[:dead] = true
    meta[:last_error] = error
    meta[:last_errored_at] = Time.now
  end
end

#normalize_url(uri) ⇒ Object

Raises:

  • (ArgumentError)


238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 238

def normalize_url(uri)
  raise ArgumentError, "Only URI objects may be passed in!" unless uri.is_a?(URI)
  uri = uri.clone

  # Set credentials if need be
  if @auth && !uri.user
    uri.user ||= @auth[:user]
    uri.password ||= @auth[:password]
  end

  uri.scheme = @scheme

  uri
end

#perform_request(method, path, params = {}, body = nil) ⇒ Object



219
220
221
222
223
224
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 219

def perform_request(method, path, params={}, body=nil)
  with_connection do |url|
    resp = perform_request_to_url(url, method, path, params, body)
    [url, resp]
  end
end

#perform_request_to_url(url, method, path, params = {}, body = nil) ⇒ Object



232
233
234
235
236
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 232

def perform_request_to_url(url, method, path, params={}, body=nil)
  res = @adapter.perform_request(url, method, path, params, body)
rescue *@adapter.host_unreachable_exceptions => e
  raise HostUnreachableError.new(e, url), "Could not reach host #{e.class}: #{e.message}"
end

#remove_url(url) ⇒ Object



291
292
293
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 291

def remove_url(url)
  @url_info.delete(url)
end

#resurrect_dead!Object



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 194

def resurrect_dead!
  # Try to keep locking granularity low such that we don't affect IO...
  @state_mutex.synchronize { @url_info.select {|url,meta| meta[:dead] } }.each do |url,meta|
    safe_url = ::LogStash::Outputs::ElasticSearch::SafeURL.without_credentials(url)
    begin
      logger.info("Running health check to see if an Elasticsearch connection is working",
                  url: safe_url, healthcheck_path: @healthcheck_path)
      perform_request_to_url(url, "HEAD", @healthcheck_path)
      # If no exception was raised it must have succeeded!
      logger.warn("Restored connection to ES instance", :url => safe_url)
      @state_mutex.synchronize { meta[:dead] = false }
    rescue HostUnreachableError => e
      logger.debug("Attempted to resurrect connection to dead ES instance, but got an error.", url: safe_url, error_type: e.class, error: e.message)
    end
  end
end

#resurrectionist_alive?Boolean

Returns:

  • (Boolean)


215
216
217
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 215

def resurrectionist_alive?
  @resurrectionist.alive?
end

#return_connection(url) ⇒ Object



373
374
375
376
377
378
379
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 373

def return_connection(url)
  @state_mutex.synchronize do
    if @url_info[url] # Guard against the condition where the connection has already been deleted
      @url_info[url][:in_use] -= 1
    end
  end
end

#sizeObject



283
284
285
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 283

def size
  @state_mutex.synchronize { @url_info.size }
end

#sniff!Object

Sniffs the cluster then updates the internal URLs



145
146
147
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 145

def sniff!
  update_urls(check_sniff)
end

#sniffer_alive?Boolean

Returns:

  • (Boolean)


182
183
184
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 182

def sniffer_alive?
  @sniffer ? @sniffer.alive? : nil
end

#start_resurrectionistObject



186
187
188
189
190
191
192
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 186

def start_resurrectionist
  @resurrectionist = Thread.new do
    until_stopped("resurrection", @resurrect_delay) do
      resurrect_dead!
    end
  end
end

#start_snifferObject



131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 131

def start_sniffer
  @sniffer = Thread.new do
    until_stopped("sniffing", sniffer_delay) do
      begin
        sniff!
      rescue NoConnectionAvailableError => e
        @state_mutex.synchronize { # Synchronize around @url_info
          logger.warn("Elasticsearch output attempted to sniff for new connections but cannot. No living connections are detected. Pool contains the following current URLs", :url_info => @url_info) }
      end
    end
  end
end

#stop_resurrectionistObject



211
212
213
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 211

def stop_resurrectionist
  @resurrectionist.join
end

#stop_snifferObject



178
179
180
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 178

def stop_sniffer
  @sniffer.join if @sniffer
end

#until_stopped(task_name, delay) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 110

def until_stopped(task_name, delay)
  last_done = Time.now
  until @state_mutex.synchronize { @stopping }
    begin
      now = Time.now
      if (now - last_done) >= delay
        last_done = now
        yield
      end
      sleep 1
    rescue => e
      logger.warn(
        "Error while performing #{task_name}",
        :error_message => e.message,
        :class => e.class.name,
        :backtrace => e.backtrace
        )
    end
  end
end

#update_urls(new_urls) ⇒ Object



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 253

def update_urls(new_urls)
  # Normalize URLs
  new_urls = new_urls.map(&method(:normalize_url))

  # Used for logging nicely
  state_changes = {:removed => [], :added => []}
  @state_mutex.synchronize do
    # Add new connections
    new_urls.each do |url|
      # URI objects don't have real hash equality! So, since this isn't perf sensitive we do a linear scan
      unless @url_info.keys.include?(url)
        state_changes[:added] << url.to_s
        add_url(url)
      end
    end

    # Delete connections not in the new list
    @url_info.each do |url,_|
      unless new_urls.include?(url)
        state_changes[:removed] << url.to_s
        remove_url(url)
      end
    end
  end

  if state_changes[:removed].size > 0 || state_changes[:added].size > 0
    logger.info("Elasticsearch pool URLs updated", :changes => state_changes)
  end
end

#url_infoObject



102
103
104
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 102

def url_info
  @state_mutex.synchronize { @url_info }
end

#url_meta(url) ⇒ Object



339
340
341
342
343
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 339

def url_meta(url)
  @state_mutex.synchronize do
    @url_info[url]
  end
end

#urlsObject



106
107
108
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 106

def urls
  url_info.keys
end

#wait_for_in_use_connectionsObject



87
88
89
90
91
92
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 87

def wait_for_in_use_connections
  until in_use_connections.empty?
    logger.info "Blocked on shutdown to in use connections #{@state_mutex.synchronize {@url_info}}"
    sleep 1
  end
end

#with_connectionObject



302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
# File 'lib/logstash/outputs/elasticsearch/http_client/pool.rb', line 302

def with_connection
  url, url_meta = get_connection

  # Custom error class used here so that users may retry attempts if they receive this error
  # should they choose to
  raise NoConnectionAvailableError, "No Available connections" unless url
  yield url
rescue HostUnreachableError => e
  # Mark the connection as dead here since this is likely not transient
  mark_dead(url, e)
  raise e
rescue BadResponseCodeError => e
  # These aren't discarded from the pool because these are often very transient
  # errors
  raise e
rescue => e
  logger.warn("UNEXPECTED POOL ERROR", :e => e)
  raise e
ensure
  return_connection(url)
end