Class: ElastomerClient::Client::Bulk

Inherits:
Object
  • Object
show all
Defined in:
lib/elastomer_client/client/bulk.rb

Overview

The Bulk class provides some abstractions and helper methods for working with the Elasticsearch bulk API command. Instances of the Bulk class accumulate indexing and delete operations and then issue a single bulk API request to Elasticsearch. Those operations are then executed by the cluster.

A maximum request size can be set. As soon as the size of the request body hits this threshold, a bulk request will be made to the search cluster. This happens as operations are added.

Additionally, a maximum action count can be set. As soon as the number of actions equals the action count, a bulk request will be made.

You can also use the ‘call` method explicitly to send a bulk request immediately.

Constant Summary collapse

DEFAULT_REQUEST_SIZE =

10 MB

2**20 * 10
SPECIAL_KEYS =
%w[id type index version version_type routing parent consistency refresh retry_on_conflict]
UNPREFIXED_SPECIAL_KEYS =
%w[parent retry_on_conflict routing version version_type]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, params = {}) ⇒ Bulk

Create a new bulk client for handling some of the details of accumulating documents to index and then formatting them properly for the bulk API command.

client - ElastomerClient::Client used for HTTP requests to the server params - Parameters Hash to pass to the Client#bulk method

:request_size - the maximum request size in bytes
:action_count - the maximum number of actions


186
187
188
189
190
191
192
193
194
195
# File 'lib/elastomer_client/client/bulk.rb', line 186

def initialize(client, params = {})
  @client  = client
  @params  = params

  @actions = []
  @current_request_size = 0
  @current_action_count = 0
  self.request_size = params.delete(:request_size) || DEFAULT_REQUEST_SIZE
  self.action_count = params.delete(:action_count)
end

Instance Attribute Details

#action_countObject

Returns the value of attribute action_count.



197
198
199
# File 'lib/elastomer_client/client/bulk.rb', line 197

def action_count
  @action_count
end

#clientObject (readonly)

Returns the value of attribute client.



197
198
199
# File 'lib/elastomer_client/client/bulk.rb', line 197

def client
  @client
end

#request_sizeObject

Returns the value of attribute request_size.



197
198
199
# File 'lib/elastomer_client/client/bulk.rb', line 197

def request_size
  @request_size
end

Instance Method Details

#add_to_actions(action, document = nil) ⇒ Object

Internal: Add the given ‘action` to the list of actions that will be performed by this bulk request. An optional `document` can also be given.

If the total size of the accumulated actions meets our desired request size, then a bulk API call will be performed. After the call the actions list is cleared and we’ll start accumulating actions again.

action - The bulk action (as a Hash) to perform document - Optional document for the action as a Hash or JSON encoded String

Returns the response from the bulk call if one was made or nil. Raises RequestSizeError if the given action is larger than the

configured requst size or the client.max_request_size


400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/elastomer_client/client/bulk.rb', line 400

def add_to_actions(action, document = nil)
  action = MultiJson.dump(action)
  size = action.bytesize

  if document
    document = MultiJson.dump(document) unless document.is_a?(String)
    size += document.bytesize
  end

  check_action_size!(size)

  response = nil
  begin
    response = call if ready_to_send?(size)
  # rubocop:disable Lint/UselessRescue
  rescue StandardError => err
    raise err
  ensure
    @actions << action
    @actions << document unless document.nil?
    @current_request_size += size
    @current_action_count += 1
  end

  response
end

#callObject

Immediately execute a bulk API call with the currently accumulated actions. The accumulated actions list will be cleared after the call has been made.

If the accumulated actions list is empty then no action is taken.

Returns the response body Hash.



314
315
316
317
318
319
320
321
322
323
# File 'lib/elastomer_client/client/bulk.rb', line 314

def call
  return nil if @actions.empty?

  body = @actions.join("\n") + "\n"
  client.bulk(body, @params)
ensure
  @current_request_size = 0
  @current_action_count = 0
  @actions.clear
end

#check_action_size!(size) ⇒ Object

Internal: Raises a RequestSizeError if the given size is larger than the configured client.max_request_size

Raises:



443
444
445
446
# File 'lib/elastomer_client/client/bulk.rb', line 443

def check_action_size!(size)
  return unless size > client.max_request_size
  raise RequestSizeError, "Bulk action of size `#{size}` exceeds the maximum requst size: #{client.max_request_size}"
end

#convert_special_keys(params) ⇒ Object

Internal: Convert incoming Ruby symbol keys to their special underscore versions. Maintains API compaibility with the ‘Docs` API for `index`, `create`, `update` and `delete`.

:id -> :_id ‘id’ -> ‘_id’

params - Hash.

Returns a new params Hash with the special keys replaced.



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
# File 'lib/elastomer_client/client/bulk.rb', line 354

def convert_special_keys(params)
  new_params = params.dup

  SPECIAL_KEYS.each do |key|
    omit_prefix = (
      client.version_support.es_version_8_plus? &&
      UNPREFIXED_SPECIAL_KEYS.include?(key)
    )

    prefixed_key = "_" + key
    converted_key = (omit_prefix ? "" : "_") + key

    if new_params.key?(prefixed_key)
      new_params[converted_key] = new_params.delete(prefixed_key)
    end

    if new_params.key?(prefixed_key.to_sym)
      new_params[converted_key.to_sym] = new_params.delete(prefixed_key.to_sym)
    end

    if new_params.key?(key)
      new_params[converted_key] = new_params.delete(key)
    end

    if new_params.key?(key.to_sym)
      new_params[converted_key.to_sym] = new_params.delete(key.to_sym)
    end
  end

  new_params
end

#create(document, params) ⇒ Object

Add a create action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).

document - The document to create as a Hash or JSON encoded String params - Parameters for the create action (as a Hash) (optional)

Examples

create({"foo" => "bar"}, {:_id => 1}
create({"foo" => "bar"}, {:id => 1}
create("foo" => "bar", "_id" => 1)

Returns the response from the bulk call if one was made or nil.



269
270
271
272
# File 'lib/elastomer_client/client/bulk.rb', line 269

def create(document, params)
  params = prepare_params(document, params)
  add_to_actions({create: params}, document)
end

#delete(params) ⇒ Object

Add a delete action to the list of bulk actions to be performed when the bulk API call is made.

params - Parameters for the delete action (as a Hash)

Examples

delete(:_id => 1, :_type => 'foo')

Returns the response from the bulk call if one was made or nil.



302
303
304
305
# File 'lib/elastomer_client/client/bulk.rb', line 302

def delete(params)
  params = prepare_params(nil, params)
  add_to_actions({delete: params})
end

#index(document, params = {}) ⇒ Object

Add an index action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).

document - The document to index as a Hash or JSON encoded String params - Parameters for the index action (as a Hash) (optional)

Examples

index({"foo" => "bar"}, {:_id => 1, :_type => "foo"}
index({"foo" => "bar"}, {:id => 1, :type => "foo"}
index("foo" => "bar", "_id" => 1, "_type" => "foo")

Returns the response from the bulk call if one was made or nil.



250
251
252
253
# File 'lib/elastomer_client/client/bulk.rb', line 250

def index(document, params = {})
  params = prepare_params(document, params)
  add_to_actions({index: params}, document)
end

#prepare_params(document, params) ⇒ Object

Internal: convert special key parameters to their wire representation and apply any override document parameters.



330
331
332
333
334
335
336
337
338
339
340
341
342
# File 'lib/elastomer_client/client/bulk.rb', line 330

def prepare_params(document, params)
  params = convert_special_keys(params)

  params.delete(:_id) if params[:_id].nil? || params[:_id].to_s.empty?
  params.delete("_id") if params["_id"].nil? || params["_id"].to_s.empty?

  if client.version_support.es_version_8_plus?
    params.delete(:_type)
    params.delete("_type")
  end

  params
end

#ready_to_send?(size) ⇒ Boolean

Internal: Determines if adding ‘size` more bytes and one more action will bring the current bulk request over the `request_size` limit or the `action_count` limit. If this method returns true, then it is time to dispatch the bulk request.

Returns ‘true` of `false`

Returns:

  • (Boolean)


433
434
435
436
437
438
439
# File 'lib/elastomer_client/client/bulk.rb', line 433

def ready_to_send?(size)
  total_request_size = @current_request_size + size
  total_action_count = @current_action_count + 1

  (request_size && total_request_size >= request_size) ||
  (action_count && total_action_count >  action_count)
end

#update(document, params) ⇒ Object

Add an update action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).

document - The document to update as a Hash or JSON encoded String params - Parameters for the update action (as a Hash) (optional)

Examples

update({"doc" => {"foo" => "bar"}}, {:_id => 1})
update({"doc" => {"foo" => "bar"}}, {:id => 1})
update({"doc" => {"foo" => "bar"}}, "_id" => 1)

Returns the response from the bulk call if one was made or nil.



288
289
290
291
# File 'lib/elastomer_client/client/bulk.rb', line 288

def update(document, params)
  params = prepare_params(document, params)
  add_to_actions({update: params}, document)
end