Class: Elastomer::Client

Inherits:
Object
  • Object
show all
Includes:
Notifications
Defined in:
lib/elastomer/notifications.rb,
lib/elastomer/client.rb,
lib/elastomer/client/bulk.rb,
lib/elastomer/client/docs.rb,
lib/elastomer/client/scan.rb,
lib/elastomer/client/index.rb,
lib/elastomer/client/nodes.rb,
lib/elastomer/client/errors.rb,
lib/elastomer/client/warmer.rb,
lib/elastomer/client/cluster.rb,
lib/elastomer/client/template.rb,
lib/elastomer/client/multi_search.rb

Overview

inject our instrument method into the Client class

Defined Under Namespace

Classes: Bulk, Cluster, Docs, Error, Index, MultiSearch, Nodes, Scan, Template, TimeoutError, Warmer

Constant Summary collapse

OpaqueIdError =

Error raised when a conflict is detected between the UUID sent in the ‘X-Opaque-Id’ request header and the one received in the response header.

Class.new Client::Error

Constants included from Notifications

Notifications::NAME

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Client

Create a new client that can be used to make HTTP requests to the ElasticSearch server.

opts - The options Hash

:host - the host as a String
:port - the port number of the server
:url  - the URL as a String (overrides :host and :port)
:read_timeout - the timeout in seconds when reading from an HTTP connection
:open_timeout - the timeout in seconds when opening an HTTP connection
:adapter      - the Faraday adapter to use (defaults to :excon)
:opaque_id    - set to `true` to use the 'X-Opaque-Id' request header


24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/elastomer/client.rb', line 24

def initialize( opts = {} )
  host = opts.fetch :host, 'localhost'
  port = opts.fetch :port, 9200
  @url = opts.fetch :url,  "http://#{host}:#{port}"

  uri = Addressable::URI.parse @url
  @host = uri.host
  @port = uri.port

  @read_timeout = opts.fetch :read_timeout, 5
  @open_timeout = opts.fetch :open_timeout, 2
  @adapter      = opts.fetch :adapter, Faraday.default_adapter
  @opaque_id    = opts.fetch :opaque_id, false
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



39
40
41
# File 'lib/elastomer/client.rb', line 39

def host
  @host
end

#open_timeoutObject (readonly)

Returns the value of attribute open_timeout.



40
41
42
# File 'lib/elastomer/client.rb', line 40

def open_timeout
  @open_timeout
end

#portObject (readonly)

Returns the value of attribute port.



39
40
41
# File 'lib/elastomer/client.rb', line 39

def port
  @port
end

#read_timeoutObject (readonly)

Returns the value of attribute read_timeout.



40
41
42
# File 'lib/elastomer/client.rb', line 40

def read_timeout
  @read_timeout
end

#urlObject (readonly)

Returns the value of attribute url.



39
40
41
# File 'lib/elastomer/client.rb', line 39

def url
  @url
end

Instance Method Details

#assert_param_presence(param, name = 'input value') ⇒ Object

Internal: Ensure that the parameter has a valid value. Things like ‘nil` and empty strings are right out. This method also performs a little formating on the parameter:

  • leading and trailing whitespace is removed

  • arrays are flattend

  • and then joined into a String

  • numerics are converted to their string equivalents

param - The param Object to validate name - Optional param name as a String (used in exception messages)

Returns the validated param as a String. Raises an ArgumentError if the param is not valid.



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
# File 'lib/elastomer/client.rb', line 282

def assert_param_presence( param, name = 'input value' )
  case param
  when String, Numeric
    param = param.to_s.strip
    raise ArgumentError, "#{name} cannot be blank: #{param.inspect}" if param =~ /\A\s*\z/
    param

  when Array
    param.flatten.map { |item| assert_param_presence(item, name) }.join(',')

  when nil
    raise ArgumentError, "#{name} cannot be nil"

  else
    raise ArgumentError, "#{name} is invalid: #{param.inspect}"
  end
end

#available?Boolean

Returns true if the server is available; returns false otherwise.

Returns:

  • (Boolean)


43
44
45
46
47
48
# File 'lib/elastomer/client.rb', line 43

def available?
  response = head '/', :action => 'cluster.available'
  response.success?
rescue StandardError
  false
end

#bulk(body = nil, params = nil) ⇒ Object

The ‘bulk` method can be used in two ways. Without a block the method will perform an API call, and it requires a bulk request body and optional request parameters. If given a block, the method will use a Bulk instance to assemble the operations called in the block into a bulk request and dispatch it at the end of the block.

See www.elasticsearch.org/guide/reference/api/bulk/

body - Request body as a String (required if a block is not given) params - Optional request parameters as a Hash

:request_size - Optional maximum request size in bytes
:action_count - Optional maximum action size

block - Passed to a Bulk instance which assembles the operations

into one or more bulk requests.

Examples

bulk(request_body, :index => 'default-index')

bulk(:index => 'default-index') do |b|
  b.index(document1)
  b.index(document2, :_type => 'default-type')
  b.delete(document3)
  ...
end

Returns the response body as a Hash



31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/elastomer/client/bulk.rb', line 31

def bulk( body = nil, params = nil )
  if block_given?
    params, body = (body || {}), nil
    yield bulk_obj = Bulk.new(self, params)
    bulk_obj.call

  else
    raise 'bulk request body cannot be nil' if body.nil?
    params ||= {}

    response = self.post '{/index}{/type}/_bulk', params.merge(:body => body, :action => 'bulk')
    response.body
  end
end

#clusterObject

Returns a Cluster instance.



5
6
7
# File 'lib/elastomer/client/cluster.rb', line 5

def cluster
  @cluster ||= Cluster.new self
end

#connectionObject

Internal: Provides access to the Faraday::Connection used by this client for all requests to the server.

Returns a Faraday::Connection



71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/elastomer/client.rb', line 71

def connection
  @connection ||= Faraday.new(url) do |conn|
    conn.request  :encode_json
    conn.response :parse_json
    conn.request  :opaque_id if @opaque_id

    Array === @adapter ?
      conn.adapter(*@adapter) :
      conn.adapter(@adapter)

    conn.options[:timeout]      = read_timeout
    conn.options[:open_timeout] = open_timeout
  end
end

#delete(path, params = {}) ⇒ Object

Internal: Sends an HTTP DELETE request to the server.

path - The path as a String params - Parameters Hash

Returns a Faraday::Response Raises an Elastomer::Client::Error on 4XX and 5XX responses



136
137
138
# File 'lib/elastomer/client.rb', line 136

def delete( path, params = {} )
  request :delete, path, params
end

#docs(name, type = nil) ⇒ Object

Provides access to document-level API commands.

name - The name of the index as a String type - The document type as a String

Returns a Docs instance.



11
12
13
# File 'lib/elastomer/client/docs.rb', line 11

def docs( name, type = nil )
  Docs.new self, name, type
end

#expand_path(path, params) ⇒ Object

Internal: Apply path expansions to the ‘path` and append query parameters to the `path`. We are using an Addressable::Template to replace ’expansion‘ fields found in the path with the values extracted from the `params` Hash. Any remaining elements in the `params` hash are treated as query parameters and appended to the end of the path.

path - The path as a String params - Parameters Hash

Examples

expand_path('/foo{/bar}', {:bar => 'hello', :q => 'what', :p => 2})
#=> '/foo/hello?q=what&p=2'

expand_path('/foo{/bar}{/baz}', {:baz => 'no bar'}
#=> '/foo/no%20bar'

Returns an Addressable::Uri



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/elastomer/client.rb', line 216

def expand_path( path, params )
  template = Addressable::Template.new path

  expansions = {}
  query_values = params.dup
  query_values.delete :action
  query_values.delete :context

  template.keys.map(&:to_sym).each do |key|
    value = query_values.delete key
    value = assert_param_presence(value, key) unless path =~ /{\/#{key}}/ && value.nil?
    expansions[key] = value
  end

  uri = template.expand(expansions)
  uri.query_values = query_values unless query_values.empty?
  uri.to_s
end

#get(path, params = {}) ⇒ Object

Internal: Sends an HTTP GET request to the server.

path - The path as a String params - Parameters Hash

Returns a Faraday::Response Raises an Elastomer::Client::Error on 4XX and 5XX responses



103
104
105
# File 'lib/elastomer/client.rb', line 103

def get( path, params = {} )
  request :get, path, params
end

#handle_errors(response) ⇒ Object

Internal: Inspect the Faraday::Response and raise an error if the status is in the 5XX range or if the response body contains an ‘error’ field. In the latter case, the value of the ‘error’ field becomes our exception message. In the absence of an ‘error’ field the response body is used as the exception message.

The raised exception will contain the response object.

response - The Faraday::Response object.

Returns the response. Raises an Elastomer::Client::Error on 500 responses or responses containing and ‘error’ field.

Raises:



261
262
263
264
265
266
# File 'lib/elastomer/client.rb', line 261

def handle_errors( response )
  raise Error, response if response.status >= 500
  raise Error, response if Hash === response.body && response.body['error']

  response
end

#head(path, params = {}) ⇒ Object

Internal: Sends an HTTP HEAD request to the server.

path - The path as a String params - Parameters Hash

Returns a Faraday::Response



92
93
94
# File 'lib/elastomer/client.rb', line 92

def head( path, params = {} )
  request :head, path, params
end

#index(name) ⇒ Object

Provides access to index-level API commands.

name - The name of the index as a String or an Array of names

Returns an Index instance.



9
10
11
# File 'lib/elastomer/client/index.rb', line 9

def index( name )
  Index.new self, name
end

#infoObject

Returns the information Hash from the attached ElasticSearch instance.



62
63
64
65
# File 'lib/elastomer/client.rb', line 62

def info
  response = get '/', :action => 'cluster.info'
  response.body
end

#instrument(path, body, params) ⇒ Object

Internal: A noop method that simply yields to the block. This method will be replaced when the ‘elastomer/notifications’ module is included.

path - The full request path as a String body - The request body as a String or ‘nil` params - The request params Hash block - The block that will be instrumented

Returns the response from the block



244
245
246
# File 'lib/elastomer/client.rb', line 244

def instrument( path, body, params )
  yield
end

#multi_search(body = nil, params = nil) ⇒ Object Also known as: msearch

Execute an array of searches in bulk. Results are returned in an array in the order the queries were sent.

The ‘multi_search` method can be used in two ways. Without a block the method will perform an API call, and it requires a bulk request body and optional request parameters.

See www.elasticsearch.org/guide/reference/api/multi-search/

body - Request body as a String (required if a block is not given) params - Optional request parameters as a Hash block - Passed to a MultiSearch instance which assembles the searches

into a single request.

Examples

# index and type in request body
multi_search(request_body)

# index in URI
multi_search(request_body, :index => 'default-index')

# block form
multi_search(:index => 'default-index') do |m|
  m.search({:query => {:match_all => {}}, :search_type => :count)
  m.search({:query => {:field => {"foo" => "bar"}}}, :type => 'default-type')
  ...
end

Returns the response body as a Hash



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/elastomer/client/multi_search.rb', line 34

def multi_search(body = nil, params = nil)
  if block_given?
    params, body = (body || {}), nil
    yield msearch_obj = MultiSearch.new(self, params)
    msearch_obj.call
  else
    raise 'multi_search request body cannot be nil' if body.nil?
    params ||= {}

    response = self.post '{/index}{/type}/_msearch', params.merge(:body => body)
    response.body
  end
end

#nodes(node_id = '_all') ⇒ Object

Provides access to node-level API commands.

node_id - The node ID as a String or an Array of node IDs

Returns a Nodes instance.



10
11
12
# File 'lib/elastomer/client/nodes.rb', line 10

def nodes( node_id = '_all' )
  Nodes.new self, node_id
end

#post(path, params = {}) ⇒ Object

Internal: Sends an HTTP POST request to the server.

path - The path as a String params - Parameters Hash

Returns a Faraday::Response Raises an Elastomer::Client::Error on 4XX and 5XX responses



125
126
127
# File 'lib/elastomer/client.rb', line 125

def post( path, params = {} )
  request :post, path, params
end

#put(path, params = {}) ⇒ Object

Internal: Sends an HTTP PUT request to the server.

path - The path as a String params - Parameters Hash

Returns a Faraday::Response Raises an Elastomer::Client::Error on 4XX and 5XX responses



114
115
116
# File 'lib/elastomer/client.rb', line 114

def put( path, params = {} )
  request :put, path, params
end

#request(method, path, params) ⇒ Object

Internal: Sends an HTTP request to the server. If the ‘params` Hash contains a :body key, it will be deleted from the Hash and the value will be used as the body of the request.

method - The HTTP method to send [:head, :get, :put, :post, :delete] path - The path as a String params - Parameters Hash

:body         - Will be used as the request body
:read_timeout - Optional read timeout (in seconds) for the request

Returns a Faraday::Response Raises an Elastomer::Client::Error on 4XX and 5XX responses



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/elastomer/client.rb', line 152

def request( method, path, params )
  body = params.delete :body
  body = MultiJson.dump body if Hash === body

  read_timeout = params.delete :read_timeout

  path = expand_path path, params

  response = instrument(path, body, params) do
    case method
    when :head
      connection.head(path) { |req| req.options[:timeout] = read_timeout if read_timeout }

    when :get
      connection.get(path) { |req|
        req.body = body if body
        req.options[:timeout] = read_timeout if read_timeout
      }

    when :put
      connection.put(path, body) { |req| req.options[:timeout] = read_timeout if read_timeout }

    when :post
      connection.post(path, body) { |req| req.options[:timeout] = read_timeout if read_timeout }

    when :delete
      connection.delete(path) { |req|
        req.body = body if body
        req.options[:timeout] = read_timeout if read_timeout
      }

    else
      raise ArgumentError, "unknown HTTP request method: #{method.inspect}"
    end
  end

  handle_errors response

rescue Faraday::Error::TimeoutError => boom
  raise ::Elastomer::Client::TimeoutError.new(boom, path)

# ensure
#   # FIXME: this is here until we get a real logger in place
#   STDERR.puts "[#{response.status.inspect}] curl -X#{method.to_s.upcase} '#{url}#{path}'" unless response.nil?
end

#scan(query, opts = {}) ⇒ Object

Create a new Scan instance for scrolling all results from a ‘query`.

query - The query to scan as a Hash or a JSON encoded String opts - Options Hash

:index  - the name of the index to search
:type   - the document type to search
:scroll - the keep alive time of the scrolling request (5 minutes by default)
:size   - the number of documents per shard to fetch per scroll

Examples

scan = client.scan('{"query":{"match_all":{}}}', :index => 'test')
scan.each_document do |document|
  document['_id']
  document['_source']
end

Returns a new Scan instance



22
23
24
# File 'lib/elastomer/client/scan.rb', line 22

def scan( query, opts = {} )
  Scan.new self, query, opts
end

#scroll(scroll_id, scroll = '5m') ⇒ Object

Continue scrolling a scan query. See www.elasticsearch.org/guide/reference/api/search/scroll/

scroll_id - The current scroll ID as a String scroll - The keep alive time of the scrolling request (5 minutes by default)

Examples

scroll_id = client.scan('{"query":{"match_all":{}}}', :index => 'test').scroll_id

h = client.scroll scroll_id   # scroll to get the next set of results
scroll_id = h['_scroll_id']   # and store the scroll_id to use later

h = client.scroll scroll_id   # scroll again to get the next set of results
scroll_id = h['_scroll_id']   # and store the scroll_id to use later

# repeat until the results are empty

Returns the response body as a Hash.



45
46
47
48
# File 'lib/elastomer/client/scan.rb', line 45

def scroll( scroll_id, scroll = '5m' )
  response = get '/_search/scroll', :body => scroll_id, :scroll => scroll, :action => 'search.scroll'
  response.body
end

#semantic_versionObject

Returns a Semantic::Version for the attached ElasticSearch instance. See rubygems.org/gems/semantic



57
58
59
# File 'lib/elastomer/client.rb', line 57

def semantic_version
  Semantic::Version.new(version)
end

#template(name) ⇒ Object

Returns a Template instance.



6
7
8
# File 'lib/elastomer/client/template.rb', line 6

def template( name )
  Template.new self, name
end

#versionObject

Returns the version String of the attached ElasticSearch instance.



51
52
53
# File 'lib/elastomer/client.rb', line 51

def version
  @version ||= info['version']['number']
end

#warmer(index_name, warmer_name) ⇒ Object

Provides access to warmer API commands. See www.elasticsearch.org/guide/reference/api/admin-indices-warmers/

index_name - The name of the index as a String warmer_name - The name of the warmer as a String

Returns a Warmer instance.



11
12
13
# File 'lib/elastomer/client/warmer.rb', line 11

def warmer(index_name, warmer_name)
  Warmer.new(self, index_name, warmer_name)
end