Module: OpenSearch::Transport::Transport::Base Abstract

Includes:
Loggable
Included in:
HTTP::Curb, HTTP::Faraday, HTTP::Manticore
Defined in:
lib/opensearch/transport/transport/base.rb

Overview

This module is abstract.

Module with common functionality for transport implementations.

Constant Summary collapse

DEFAULT_PORT =
9200
DEFAULT_PROTOCOL =
'http'
DEFAULT_RELOAD_AFTER =

Requests

10_000
DEFAULT_RESURRECT_AFTER =

Seconds

60
DEFAULT_MAX_RETRIES =

Requests

3
DEFAULT_SERIALIZER_CLASS =
Serializer::MultiJson
SANITIZED_PASSWORD =
'*' * (rand(14)+1)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#log_debug, #log_error, #log_fatal, #log_info, #log_warn

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def connections
  @connections
end

#counterObject (readonly)

Returns the value of attribute counter.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def counter
  @counter
end

#hostsObject (readonly)

Returns the value of attribute hosts.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def hosts
  @hosts
end

#last_request_atObject (readonly)

Returns the value of attribute last_request_at.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def last_request_at
  @last_request_at
end

#loggerObject

Returns the value of attribute logger.



45
46
47
# File 'lib/opensearch/transport/transport/base.rb', line 45

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def options
  @options
end

#protocolObject (readonly)

Returns the value of attribute protocol.



44
45
46
# File 'lib/opensearch/transport/transport/base.rb', line 44

def protocol
  @protocol
end

#reload_afterObject

Returns the value of attribute reload_after.



45
46
47
# File 'lib/opensearch/transport/transport/base.rb', line 45

def reload_after
  @reload_after
end

#reload_connectionsObject

Returns the value of attribute reload_connections.



45
46
47
# File 'lib/opensearch/transport/transport/base.rb', line 45

def reload_connections
  @reload_connections
end

#resurrect_afterObject

Returns the value of attribute resurrect_after.



45
46
47
# File 'lib/opensearch/transport/transport/base.rb', line 45

def resurrect_after
  @resurrect_after
end

#serializerObject

Returns the value of attribute serializer.



45
46
47
# File 'lib/opensearch/transport/transport/base.rb', line 45

def serializer
  @serializer
end

#snifferObject

Returns the value of attribute sniffer.



45
46
47
# File 'lib/opensearch/transport/transport/base.rb', line 45

def sniffer
  @sniffer
end

#tracerObject

Returns the value of attribute tracer.



45
46
47
# File 'lib/opensearch/transport/transport/base.rb', line 45

def tracer
  @tracer
end

Instance Method Details

#__build_connection(host, options = {}, block = nil) ⇒ Connections::Connection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

This method is abstract.

Build and return a connection. A transport implementation must implement this method. See HTTP::Faraday#__build_connection for an example.

Returns:

Raises:

  • (NoMethodError)


179
180
181
# File 'lib/opensearch/transport/transport/base.rb', line 179

def __build_connection(host, options={}, block=nil)
  raise NoMethodError, "Implement this method in your class"
end

#__build_connectionsConnections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Builds and returns a collection of connections

The adapters have to implement the #__build_connection method.



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/opensearch/transport/transport/base.rb', line 156

def __build_connections
  Connections::Collection.new \
    :connections => hosts.map { |host|
    host[:protocol] = host[:scheme] || options[:scheme] || options[:http][:scheme] || DEFAULT_PROTOCOL
    host[:port] ||= options[:port] || options[:http][:port] || DEFAULT_PORT
    if (options[:user] || options[:http][:user]) && !host[:user]
      host[:user] ||= options[:user] || options[:http][:user]
      host[:password] ||= options[:password] || options[:http][:password]
    end

    __build_connection(host, (options[:transport_options] || {}), @block)
  },
    :selector_class => options[:selector_class],
    :selector => options[:selector]
end

#__close_connectionsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Closes the connections collection



187
188
189
# File 'lib/opensearch/transport/transport/base.rb', line 187

def __close_connections
  # A hook point for specific adapters when they need to close connections
end

#__convert_to_json(o = nil, options = {}) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Converts any non-String object to JSON



234
235
236
# File 'lib/opensearch/transport/transport/base.rb', line 234

def __convert_to_json(o=nil, options={})
  o = o.is_a?(String) ? o : serializer.dump(o, options)
end

#__full_url(host) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a full URL based on information from host

Parameters:

  • host (Hash)

    Host configuration passed in from Client



243
244
245
246
247
248
249
250
# File 'lib/opensearch/transport/transport/base.rb', line 243

def __full_url(host)
  url  = "#{host[:protocol]}://"
  url += "#{CGI.escape(host[:user])}:#{CGI.escape(host[:password])}@" if host[:user]
  url += host[:host]
  url += ":#{host[:port]}" if host[:port]
  url += host[:path] if host[:path]
  url
end

#__log_response(method, path, params, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Log request and response information



195
196
197
198
199
200
201
202
203
# File 'lib/opensearch/transport/transport/base.rb', line 195

def __log_response(method, path, params, body, url, response, json, took, duration)
  if logger
    sanitized_url = url.to_s.gsub(/\/\/(.+):(.+)@/, '//' + '\1:' + SANITIZED_PASSWORD + '@')
    log_info "#{method.to_s.upcase} #{sanitized_url} " +
                 "[status:#{response.status}, request:#{sprintf('%.3fs', duration)}, query:#{took}]"
    log_debug "> #{__convert_to_json(body)}" if body
    log_debug "< #{response.body}"
  end
end

#__raise_transport_error(response) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Raise error specific for the HTTP response status or a generic server error



225
226
227
228
# File 'lib/opensearch/transport/transport/base.rb', line 225

def __raise_transport_error(response)
  error = ERRORS[response.status] || ServerError
  raise error.new "[#{response.status}] #{response.body}"
end

#__rebuild_connections(arguments = {}) ⇒ Connections::Collection

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Rebuilds the connections collection in the transport.

The methods adds new connections from the passed hosts to the collection, and removes all connections not contained in the passed hosts.



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/opensearch/transport/transport/base.rb', line 132

def __rebuild_connections(arguments={})
  @state_mutex.synchronize do
    @hosts       = arguments[:hosts]    || []
    @options     = arguments[:options]  || {}

    __close_connections

    new_connections = __build_connections
    stale_connections = @connections.all.select  { |c| ! new_connections.include?(c) }
    new_connections = new_connections.reject { |c| @connections.all.include?(c) }

    @connections.remove(stale_connections)
    @connections.add(new_connections)
    @connections
  end
end

#__trace(method, path, params, headers, body, url, response, json, took, duration) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Trace the request in the ‘curl` format



209
210
211
212
213
214
215
216
217
218
219
# File 'lib/opensearch/transport/transport/base.rb', line 209

def __trace(method, path, params, headers, body, url, response, json, took, duration)
  trace_url  = "http://localhost:9200/#{path}?pretty" +
      ( params.empty? ? '' : "&#{::Faraday::Utils::ParamsHash[params].to_query}" )
  trace_body = body ? " -d '#{__convert_to_json(body, :pretty => true)}'" : ''
  trace_command = "curl -X #{method.to_s.upcase}"
  trace_command += " -H '#{headers.collect { |k,v| "#{k}: #{v}" }.join(", ")}'" if headers && !headers.empty?
  trace_command += " '#{trace_url}'#{trace_body}\n"
  tracer.info trace_command
  tracer.debug "# #{Time.now.iso8601} [#{response.status}] (#{format('%.3f', duration)}s)\n#"
  tracer.debug json ? serializer.dump(json, :pretty => true).gsub(/^/, '# ').sub(/\}$/, "\n# }")+"\n" : "# #{response.body}\n"
end

#get_connection(options = {}) ⇒ Connections::Connection

Returns a connection from the connection pool by delegating to Connections::Collection#get_connection.

Resurrects dead connection if the ‘resurrect_after` timeout has passed. Increments the counter and performs connection reloading if the `reload_connections` option is set.



95
96
97
98
99
100
101
# File 'lib/opensearch/transport/transport/base.rb', line 95

def get_connection(options={})
  resurrect_dead_connections! if Time.now > @last_request_at + @resurrect_after

  @counter_mtx.synchronize { @counter += 1 }
  reload_connections!         if reload_connections && counter % reload_after == 0
  connections.get_connection(options)
end

#host_unreachable_exceptionsArray

This method is abstract.

Returns an Array of connection errors specific to the transport implementation. See HTTP::Faraday#host_unreachable_exceptions for an example.

Returns:

  • (Array)


380
381
382
# File 'lib/opensearch/transport/transport/base.rb', line 380

def host_unreachable_exceptions
  [Errno::ECONNREFUSED]
end

#initialize(arguments = {}, &block) ⇒ Object

Creates a new transport object

Parameters:

  • arguments (Hash) (defaults to: {})

    Settings and options for the transport

  • block (Proc)

    Lambda or Proc which can be evaluated in the context of the “session” object

Options Hash (arguments):

  • :hosts (Array)

    An Array of normalized hosts information

  • :options (Array)

    A Hash with options (usually passed by Client)

See Also:



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/opensearch/transport/transport/base.rb', line 59

def initialize(arguments = {}, &block)
  @state_mutex = Mutex.new

  @hosts       = arguments[:hosts]   || []
  @options     = arguments[:options] || {}
  @options[:http] ||= {}
  @options[:retry_on_status] ||= []

  @block       = block
  @compression = !!@options[:compression]
  @connections = __build_connections

  @serializer  = options[:serializer] || ( options[:serializer_class] ? options[:serializer_class].new(self) : DEFAULT_SERIALIZER_CLASS.new(self) )
  @protocol    = options[:protocol] || DEFAULT_PROTOCOL

  @logger      = options[:logger]
  @tracer      = options[:tracer]

  @sniffer     = options[:sniffer_class] ? options[:sniffer_class].new(self) : Sniffer.new(self)
  @counter     = 0
  @counter_mtx = Mutex.new
  @last_request_at = Time.now
  @reload_connections = options[:reload_connections]
  @reload_after    = options[:reload_connections].is_a?(Integer) ? options[:reload_connections] : DEFAULT_RELOAD_AFTER
  @resurrect_after = options[:resurrect_after] || DEFAULT_RESURRECT_AFTER
  @retry_on_status = Array(options[:retry_on_status]).map { |d| d.to_i }
end

#perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block) ⇒ Response

This method is abstract.

The transport implementation has to implement this method either in full, or by invoking this method with a block. See HTTP::Faraday#perform_request for an example.

Performs a request to Elasticsearch, while handling logging, tracing, marking dead connections, retrying the request and reloading the connections.

Parameters:

  • method (String)

    Request method

  • path (String)

    The API endpoint

  • params (Hash) (defaults to: {})

    Request parameters (will be serialized by Connections::Connection#full_url)

  • body (Hash) (defaults to: nil)

    Request body (will be serialized by the #serializer)

  • headers (Hash) (defaults to: nil)

    Request headers (will be serialized by the #serializer)

  • block (Proc)

    Code block to evaluate, passed from the implementation

Returns:

Raises:

  • (NoMethodError)

    If no block is passed

  • (ServerError)

    If request failed on server

  • (Error)

    If no connection is available



270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/opensearch/transport/transport/base.rb', line 270

def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}, &block)
  raise NoMethodError, 'Implement this method in your transport class' unless block_given?

  start = Time.now
  tries = 0
  reload_on_failure = opts.fetch(:reload_on_failure, @options[:reload_on_failure])

  max_retries = if opts.key?(:retry_on_failure)
    opts[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : opts[:retry_on_failure]
  elsif options.key?(:retry_on_failure)
    options[:retry_on_failure] === true ? DEFAULT_MAX_RETRIES : options[:retry_on_failure]
  end

  params = params.clone

  ignore = Array(params.delete(:ignore)).compact.map { |s| s.to_i }

  begin
    tries     += 1
    connection = get_connection or raise Error.new('Cannot get new connection from pool.')

    if connection.connection.respond_to?(:params) && connection.connection.params.respond_to?(:to_hash)
      params = connection.connection.params.merge(params.to_hash)
    end

    url      = connection.full_url(path, params)

    response = block.call(connection, url)

    connection.healthy! if connection.failures > 0

    # Raise an exception so we can catch it for `retry_on_status`
    __raise_transport_error(response) if response.status.to_i >= 300 && @retry_on_status.include?(response.status.to_i)

  rescue OpenSearch::Transport::Transport::ServerError => e
    if response && @retry_on_status.include?(response.status)
      log_warn "[#{e.class}] Attempt #{tries} to get response from #{url}"
      if tries <= (max_retries || DEFAULT_MAX_RETRIES)
        retry
      else
        log_fatal "[#{e.class}] Cannot get response from #{url} after #{tries} tries"
        raise e
      end
    else
      raise e
    end

  rescue *host_unreachable_exceptions => e
    log_error "[#{e.class}] #{e.message} #{connection.host.inspect}"

    connection.dead!

    if reload_on_failure and tries < connections.all.size
      log_warn "[#{e.class}] Reloading connections (attempt #{tries} of #{connections.all.size})"
      reload_connections! and retry
    end

    if max_retries
      log_warn "[#{e.class}] Attempt #{tries} connecting to #{connection.host.inspect}"
      if tries <= max_retries
        retry
      else
        log_fatal "[#{e.class}] Cannot connect to #{connection.host.inspect} after #{tries} tries"
        raise e
      end
    else
      raise e
    end

  rescue Exception => e
    log_fatal "[#{e.class}] #{e.message} (#{connection.host.inspect if connection})"
    raise e

  end #/begin

  duration = Time.now - start

  if response.status.to_i >= 300
    __log_response    method, path, params, body, url, response, nil, 'N/A', duration
    __trace  method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer

    # Log the failure only when `ignore` doesn't match the response status
    unless ignore.include?(response.status.to_i)
      log_fatal "[#{response.status}] #{response.body}"
    end

    __raise_transport_error response unless ignore.include?(response.status.to_i)
  end

  json     = serializer.load(response.body) if response.body && !response.body.empty? && response.headers && response.headers["content-type"] =~ /json/
  took     = (json['took'] ? sprintf('%.3fs', json['took']/1000.0) : 'n/a') rescue 'n/a'

  unless ignore.include?(response.status.to_i)
    __log_response   method, path, params, body, url, response, json, took, duration
  end

  __trace  method, path, params, connection.connection.headers, body, url, response, nil, 'N/A', duration if tracer

  warnings(response.headers['warning']) if response.headers&.[]('warning')

  Response.new response.status, json || response.body, response.headers
ensure
  @last_request_at = Time.now
end

#reload_connections!Object

Reloads and replaces the connection collection based on cluster information

See Also:



107
108
109
110
111
112
113
114
# File 'lib/opensearch/transport/transport/base.rb', line 107

def reload_connections!
  hosts = sniffer.hosts
  __rebuild_connections :hosts => hosts, :options => options
  self
rescue SnifferTimeoutError
  log_error "[SnifferTimeoutError] Timeout when reloading connections."
  self
end

#resurrect_dead_connections!Object

Tries to “resurrect” all eligible dead connections



120
121
122
# File 'lib/opensearch/transport/transport/base.rb', line 120

def resurrect_dead_connections!
  connections.dead.each { |c| c.resurrect! }
end