Class: Elastic::Transport::Client

Inherits:
Object
  • Object
show all
Includes:
MetaHeader
Defined in:
lib/elastic/transport/client.rb

Overview

Handles communication with an Elastic cluster.

See README for usage and code examples.

Constant Summary collapse

DEFAULT_TRANSPORT_CLASS =
Transport::HTTP::Faraday
DEFAULT_LOGGER =
lambda do
  require 'logger'
  logger = Logger.new(STDERR)
  logger.progname = 'elastic'
  logger.formatter = proc { |severity, datetime, progname, msg| "#{datetime}: #{msg}\n" }
  logger
end
DEFAULT_TRACER =
lambda do
  require 'logger'
  logger = Logger.new(STDERR)
  logger.progname = 'elastic.tracer'
  logger.formatter = proc { |severity, datetime, progname, msg| "#{msg}\n" }
  logger
end
DEFAULT_HOST =

The default host and port to use if not otherwise specified.

Since:

  • 7.0.0

'localhost:9200'.freeze
DEFAULT_PORT =

The default port to use if not otherwise specified.

Since:

  • 7.2.0

9200

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from MetaHeader

#called_from?, #client_meta_version, #elasticsearch?, #enterprise_search?, #meta_header_adapter, #meta_header_engine, #meta_header_service_version, #serverless?, #set_meta_header

Constructor Details

#initialize(arguments = {}) {|faraday| ... } ⇒ Client

Create a client connected to an Elastic cluster.

Specify the URL via arguments or set the ‘ELASTICSEARCH_URL` environment variable.

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

  • enable_meta_header (Hash)

    a customizable set of options

  • ca_fingerprint (Hash)

    a customizable set of options

Options Hash (arguments):

  • :hosts (String, Array)

    Single host passed as a String or Hash, or multiple hosts passed as an Array; ‘host` or `url` keys are also valid

  • :log (Boolean)

    Use the default logger (disabled by default)

  • :trace (Boolean)

    Use the default tracer (disabled by default)

  • :logger (Object)

    An instance of a Logger-compatible object

  • :tracer (Object)

    An instance of a Logger-compatible object

  • :resurrect_after (Number)

    After how many seconds a dead connection should be tried again

  • :reload_connections (Boolean, Number)

    Reload connections after X requests (false by default)

  • :randomize_hosts (Boolean)

    Shuffle connections on initialization and reload (false by default)

  • :sniffer_timeout (Integer)

    Timeout for reloading connections in seconds (1 by default)

  • :retry_on_failure (Boolean, Number)

    Retry X times when request fails before raising an exception (false by default)

  • :delay_on_retry (Number)

    Delay in milliseconds between each retry (0 by default)

  • Array (Number)

    :retry_on_status Retry when specific status codes are returned

  • :reload_on_failure (Boolean)

    Reload connections after failure (false by default)

  • :request_timeout (Integer)

    The request timeout to be passed to transport in options in seconds (the default value is taken from the transport)

  • :adapter (Symbol)

    A specific adapter for Faraday (e.g. ‘:patron`)

  • :transport_options (Hash)

    Options to be passed to the ‘Faraday::Connection` constructor

  • :transport_class (Constant)

    A specific transport class to use, will be initialized by the client and passed hosts and all arguments

  • :transport (Object)

    A specific transport instance

  • :serializer_class (Constant)

    A specific serializer class to use, will be initialized by the transport and passed the transport instance

  • :selector (Constant)

    An instance of selector strategy implemented with Transport::Connections::Selector::Base.

  • :send_get_body_as (String)

    Specify the HTTP method to use for GET requests with a body. (Default: GET)

  • :compression (Boolean)

    Whether to compress requests. Gzip compression will be used. The default is false. Responses will automatically be inflated if they are compressed. If a custom transport object is used, it must handle the request compression and response inflation.

Yields:

  • (faraday)

    Access and configure the ‘Faraday::Connection` instance directly with a block



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/elastic/transport/client.rb', line 124

def initialize(arguments = {}, &block)
  @arguments = arguments.transform_keys(&:to_sym)
  @arguments[:logger] ||= @arguments[:log]   ? DEFAULT_LOGGER.call() : nil
  @arguments[:tracer] ||= @arguments[:trace] ? DEFAULT_TRACER.call() : nil
  @arguments[:reload_connections] ||= false
  @arguments[:retry_on_failure]   ||= false
  @arguments[:delay_on_retry]     ||= 0
  @arguments[:reload_on_failure]  ||= false
  @arguments[:randomize_hosts]    ||= false
  @arguments[:transport_options]  ||= {}
  @arguments[:http]               ||= {}
  @arguments[:enable_meta_header] = arguments.fetch(:enable_meta_header, true)

  @hosts ||= __extract_hosts(@arguments[:hosts] ||
                             @arguments[:host] ||
                             @arguments[:url] ||
                             @arguments[:urls] ||
                             ENV['ELASTICSEARCH_URL'] ||
                             DEFAULT_HOST)

  @send_get_body_as = @arguments[:send_get_body_as] || 'GET'
  @ca_fingerprint = @arguments.delete(:ca_fingerprint)

  if @arguments[:request_timeout]
    @arguments[:transport_options][:request] = { timeout: @arguments[:request_timeout] }
  end

  if @arguments[:transport]
    @transport = @arguments[:transport]
  else
    @transport_class = @arguments[:transport_class] || DEFAULT_TRANSPORT_CLASS
    @transport = if @transport_class == Transport::HTTP::Faraday
                   @arguments[:adapter] ||= __auto_detect_adapter
                   set_meta_header # from include MetaHeader
                   @transport_class.new(hosts: @hosts, options: @arguments) do |faraday|
                     faraday.adapter(@arguments[:adapter])
                     block&.call faraday
                   end
                 else
                   set_meta_header # from include MetaHeader
                   @transport_class.new(hosts: @hosts, options: @arguments)
                 end
  end

  if defined?(::OpenTelemetry) && ENV[OpenTelemetry::ENV_VARIABLE_ENABLED] != 'false'
    @otel = OpenTelemetry.new(@arguments)
  end
end

Instance Attribute Details

#transportObject

Returns the transport object.



61
62
63
# File 'lib/elastic/transport/client.rb', line 61

def transport
  @transport
end

Instance Method Details

#perform_request(method, path, params = {}, body = nil, headers = nil, opts = {}) ⇒ Object

Performs a request through delegation to #transport.



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/elastic/transport/client.rb', line 175

def perform_request(method, path, params = {}, body = nil, headers = nil, opts = {})
  method = @send_get_body_as if method == 'GET' && body
  validate_ca_fingerprints if @ca_fingerprint
  if @otel
    # If no endpoint is specified in the opts, use the HTTP method name
    span_name = opts[:endpoint] || method
    @otel.tracer.in_span(span_name) do |span|
      span['http.request.method'] = method
      span['db.system'] = 'elasticsearch'
      opts[:defined_params]&.each do |k, v|
        if v.respond_to?(:join)
          span["db.elasticsearch.path_parts.#{k}"] = v.join(',')
        else
          span["db.elasticsearch.path_parts.#{k}"] = v
        end
      end
      if body_as_json = @otel.process_body(body, opts[:endpoint])
        span['db.statement'] = body_as_json
      end
      span['db.operation'] = opts[:endpoint] if opts[:endpoint]
      transport.perform_request(method, path, params || {}, body, headers)
    end
  else
    transport.perform_request(method, path, params || {}, body, headers)
  end
end