Module: Elasticsearch::Transport::Transport::Base Abstract

Includes:
Loggable
Included in:
HTTP::Curb, HTTP::Faraday, HTTP::Manticore
Defined in:
lib/elasticsearch/transport/transport/base.rb

Overview

This module is abstract.

Module with common functionality for transport implementations.

Constant Summary collapse

DEFAULT_PORT =
9200
DEFAULT_PROTOCOL =
'http'
DEFAULT_RELOAD_AFTER =

Requests

10_000
DEFAULT_RESURRECT_AFTER =

Seconds

60
DEFAULT_MAX_RETRIES =

Requests

3
DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson
SANITIZED_PASSWORD =
'*' * (rand(14)+1)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



34
35
36
# File 'lib/elasticsearch/transport/transport/base.rb', line 34

def connections
  @connections
end

#counterObject (readonly)

Returns the value of attribute counter.



34
35
36
# File 'lib/elasticsearch/transport/transport/base.rb', line 34

def counter
  @counter
end

#hostsObject (readonly)

Returns the value of attribute hosts.



34
35
36
# File 'lib/elasticsearch/transport/transport/base.rb', line 34

def hosts
  @hosts
end

#last_request_atObject (readonly)

Returns the value of attribute last_request_at.



34
35
36
# File 'lib/elasticsearch/transport/transport/base.rb', line 34

def last_request_at
  @last_request_at
end

#loggerObject

Returns the value of attribute logger.



35
36
37
# File 'lib/elasticsearch/transport/transport/base.rb', line 35

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



34
35
36
# File 'lib/elasticsearch/transport/transport/base.rb', line 34

def options
  @options
end

#protocolObject (readonly)

Returns the value of attribute protocol.



34
35
36
# File 'lib/elasticsearch/transport/transport/base.rb', line 34

def protocol
  @protocol
end

#reload_afterObject

Returns the value of attribute reload_after.



35
36
37
# File 'lib/elasticsearch/transport/transport/base.rb', line 35

def reload_after
  @reload_after
end

#reload_connectionsObject

Returns the value of attribute reload_connections.



35
36
37
# File 'lib/elasticsearch/transport/transport/base.rb', line 35

def reload_connections
  @reload_connections
end

#resurrect_afterObject

Returns the value of attribute resurrect_after.



35
36
37
# File 'lib/elasticsearch/transport/transport/base.rb', line 35

def resurrect_after
  @resurrect_after
end

#serializerObject

Returns the value of attribute serializer.



35
36
37
# File 'lib/elasticsearch/transport/transport/base.rb', line 35

def serializer
  @serializer
end

#snifferObject

Returns the value of attribute sniffer.



35
36
37
# File 'lib/elasticsearch/transport/transport/base.rb', line 35

def sniffer
  @sniffer
end

#tracerObject

Returns the value of attribute tracer.



35
36
37
# File 'lib/elasticsearch/transport/transport/base.rb', line 35

def tracer
  @tracer
end

Instance Method Details

#__build_connection(host, options = {}, block = nil) ⇒ Connections::Connection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method is abstract.

Build and return a connection. A transport implementation must implement this method. See HTTP::Faraday#__build_connection for an example.

Returns:

Raises:

  • (NoMethodError)


170
171
172
# File 'lib/elasticsearch/transport/transport/base.rb', line 170

def __build_connection(host, options={}, block=nil)
  raise NoMethodError, "Implement this method in your class"
end

#__build_connectionsConnections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Builds and returns a collection of connections

The adapters have to implement the #__build_connection method.



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/elasticsearch/transport/transport/base.rb', line 147

def __build_connections
  Connections::Collection.new \
    :connections => hosts.map { |host|
    host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
    host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
    if (options[:user] || options[:http][:user]) && !host[:user]
      host[:user] ||= options[:user] || options[:http][:user]
      host[:password] ||= options[:password] || options[:http][:password]
    end

    __build_connection(host, (options[:transport_options] || {}), @block)
  },
    :selector_class => options[:selector_class],
    :selector => options[:selector]
end

#__close_connectionsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the connections collection



178
179
180
# File 'lib/elasticsearch/transport/transport/base.rb', line 178

def __close_connections
  # A hook point for specific adapters when they need to close connections
end

#__convert_to_json(o = nil, options = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Converts any non-String object to JSON



225
226
227
# File 'lib/elasticsearch/transport/transport/base.rb', line 225

def __convert_to_json(o=nil, options={})
  o.is_a?(String) ? o : serializer.dump(o, options)
end

#__full_url(host) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a full URL based on information from host

Parameters:

  • host (Hash)

    Host configuration passed in from Client



234
235
236
237
238
239
240
241
# File 'lib/elasticsearch/transport/transport/base.rb', line 234

def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user]
  url += host[:host]
  url += ":#{host[:port]}" if host[:port]
  url += host[:path] if host[:path]
  url
end

#__log_response(method, path, params, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log request and response information



186
187
188
189
190
191
192
193
194
# File 'lib/elasticsearch/transport/transport/base.rb', line 186

def __log_response(method, path, params, body, url, response, json, took, duration)
  if logger
    sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
    log_info "#{method.to_s.upcase} #{sanitized_url} " +
                 "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
    log_debug "> #{__convert_to_json(body)}" if body
    log_debug "< #{response.body}"
  end
end

#__raise_transport_error(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raise error specific for the HTTP response status or a generic server error



216
217
218
219
# File 'lib/elasticsearch/transport/transport/base.rb', line 216

def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error.new "[#{response.status}] #{response.body}"
end

#__rebuild_connections(arguments = {}) ⇒ Connections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Rebuilds the connections collection in the transport.

The methods adds new connections from the passed hosts to the collection, and removes all connections not contained in the passed hosts.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/elasticsearch/transport/transport/base.rb', line 123

def __rebuild_connections(arguments={})
  @state_mutex.synchronize do
    @hosts       = arguments[:hosts]    || []
    @options     = arguments[:options]  || {}

    __close_connections

    new_connections = __build_connections
    stale_connections = @connections.all.select  { |c| ! new_connections.include?(c) }
    new_connections = new_connections.reject { |c| @connections.all.include?(c) }

    @connections.remove(stale_connections)
    @connections.add(new_connections)
    @connections
  end
end

#__trace(method, path, params, headers, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Trace the request in the ‘curl` format



200
201
202
203
204
205
206
207
208
209
210
# File 'lib/elasticsearch/transport/transport/base.rb', line 200

def __trace(method, path, params, headers, body, url, response, json, took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
      ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
  trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
  trace_command = "curl -X #{method.to_s.upcase}"
  trace_command += " -H '#{headers.collect { |k,v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty?
  trace_command += " '#{trace_url}'#{trace_body}\n"
  tracer.info trace_command
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end

#get_connection(options = {}) ⇒ Connections::Connection

Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.



86
87
88
89
90
91
92
# File 'lib/elasticsearch/transport/transport/base.rb', line 86

def get_connection(options={})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  @counter_mtx.synchronize { @counter += 1 }
  reload_connections!         if reload_connections && counter % reload_after == 0
  connections.get_connection(options)
end

#host_unreachable_exceptionsArray

This method is abstract.

Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.

Returns:

  • (Array)


363
364
365
# File 'lib/elasticsearch/transport/transport/base.rb', line 363

def host_unreachable_exceptions
  [Errno::ECONNREFUSED]
end

#initialize(arguments = {}, &block) ⇒ Object

Creates a new transport object

Parameters:

  • arguments (Hash) (defaults to: {})

    Settings and options for the transport

  • block (Proc)

    Lambda or Proc which can be evaluated in the context of the “session” object

Options Hash (arguments):

  • :hosts (Array)

    An Array of normalized hosts information

  • :options (Array)

    A Hash with options (usually passed by Client)

See Also:



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/elasticsearch/transport/transport/base.rb', line 49

def initialize(arguments = {}, &block)
  @state_mutex = Mutex.new

  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @options[:http] ||= {}
  @options[:retry_on_status] ||= []
  @options[:delay_on_retry]  ||= 0

  @block       = block
  @compression = !!@options[:compression]
  @connections = __build_connections

  @serializer  = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @counter_mtx = Mutex.new
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
end

#perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block) ⇒ Response

This method is abstract.

The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

Parameters:

  • method (String)

    Request method

  • path (String)

    The API endpoint

  • params (Hash) (defaults to: {})

    Request parameters (will be serialized by Connections::Connection#full_url)

  • body (Hash) (defaults to: nil)

    Request body (will be serialized by the #serializer)

  • headers (Hash) (defaults to: nil)

    Request headers (will be serialized by the #serializer)

  • block (Proc)

    Code block to evaluate, passed from the implementation

Returns:

Raises:

  • (NoMethodError)

    If no block is passed

  • (ServerError)

    If request failed on server

  • (Error)

    If no connection is available



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/elasticsearch/transport/transport/base.rb', line 261

def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block)
  raise NoMethodError, 'Implement this method in your transport class' unless block_given?

  start = Time.now
  tries = 0
  reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])
  delay_on_retry = opts.fetch(:delay_on_retry, @options[:delay_on_retry])

  max_retries = if opts.key?(:retry_on_failure)
    opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
  elsif options.key?(:retry_on_failure)
    options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
  end

  params = params.clone
  ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }

  begin
    sleep(delay_on_retry / 1000.0) if tries > 0
    tries     += 1
    connection = get_connection or raise Error.new('Cannot get new connection from pool.')

    if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url      = connection.full_url(path, params)
    response = block.call(connection, url)
    connection.healthy! if connection.failures > 0

    # Raise an exception so we can catch it for `retry_on_status`
    __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)

  rescue Elasticsearch::Transport::Transport::ServerError => e
    if response && @retry_on_status.include?(response.status)
      log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
      if tries <= (max_retries || DEFAULT_MAX_RETRIES)
        retry
      else
        log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
        raise e
      end
    else
      raise e
    end

  rescue *host_unreachable_exceptions => e
    log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"

    connection.dead!
    if reload_on_failure and tries < connections.all.size
      log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
      reload_connections! and retry
    end

    if max_retries
      log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
      if tries <= max_retries
        retry
      else
        log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
        raise e
      end
    else
      raise e
    end

  rescue Exception => e
    log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
    raise e

  end #/begin

  duration = Time.now - start

  if response.status.to_i >= 300
    __log_response(method, path, params, body, url, response, nil, 'N/A', duration)
    __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer
    # Log the failure only when `ignore` doesn't match the response status
    log_fatal "[#{response.status}] #{response.body}" unless ignore.include?(response.status.to_i)
    __raise_transport_error response unless ignore.include?(response.status.to_i)
  end

  json     = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
  took     = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a'

  unless ignore.include?(response.status.to_i)
    __log_response   method, path, params, body, url, response, json, took, duration
  end

  __trace(method, path, params, connection_headers(connection), body, url, response, nil, 'N/A', duration) if tracer
  log_warn(response.headers['warning']) if response.headers&.[]('warning')
  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end

#reload_connections!Object

Reloads and replaces the connection collection based on cluster information

See Also:



98
99
100
101
102
103
104
105
# File 'lib/elasticsearch/transport/transport/base.rb', line 98

def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections :hosts => hosts, :options => options
  self
rescue SnifferTimeoutError
  log_error "[SnifferTimeoutError] Timeout when reloading connections."
  self
end

#resurrect_dead_connections!Object

Tries to “resurrect” all eligible dead connections



111
112
113
# File 'lib/elasticsearch/transport/transport/base.rb', line 111

def resurrect_dead_connections!
  connections.dead.each { |c| c.resurrect! }
end