Class: ElastomerClient::Client::Bulk
- Inherits:
-
Object
- Object
- ElastomerClient::Client::Bulk
- Defined in:
- lib/elastomer_client/client/bulk.rb
Overview
The Bulk class provides some abstractions and helper methods for working with the Elasticsearch bulk API command. Instances of the Bulk class accumulate indexing and delete operations and then issue a single bulk API request to Elasticsearch. Those operations are then executed by the cluster.
A maximum request size can be set. As soon as the size of the request body hits this threshold, a bulk request will be made to the search cluster. This happens as operations are added.
Additionally, a maximum action count can be set. As soon as the number of actions equals the action count, a bulk request will be made.
You can also use the ‘call` method explicitly to send a bulk request immediately.
Constant Summary collapse
- DEFAULT_REQUEST_SIZE =
10 MB
2**20 * 10
- SPECIAL_KEYS =
%w[id type index version version_type routing parent consistency refresh retry_on_conflict]
- UNPREFIXED_SPECIAL_KEYS =
%w[parent retry_on_conflict routing version version_type]
Instance Attribute Summary collapse
-
#action_count ⇒ Object
Returns the value of attribute action_count.
-
#client ⇒ Object
readonly
Returns the value of attribute client.
-
#request_size ⇒ Object
Returns the value of attribute request_size.
Instance Method Summary collapse
-
#add_to_actions(action, document = nil) ⇒ Object
Internal: Add the given ‘action` to the list of actions that will be performed by this bulk request.
-
#call ⇒ Object
Immediately execute a bulk API call with the currently accumulated actions.
-
#check_action_size!(size) ⇒ Object
Internal: Raises a RequestSizeError if the given size is larger than the configured client.max_request_size.
-
#convert_special_keys(params) ⇒ Object
Internal: Convert incoming Ruby symbol keys to their special underscore versions.
-
#create(document, params) ⇒ Object
Add a create action to the list of bulk actions to be performed when the bulk API call is made.
-
#delete(params) ⇒ Object
Add a delete action to the list of bulk actions to be performed when the bulk API call is made.
-
#index(document, params = {}) ⇒ Object
Add an index action to the list of bulk actions to be performed when the bulk API call is made.
-
#initialize(client, params = {}) ⇒ Bulk
constructor
Create a new bulk client for handling some of the details of accumulating documents to index and then formatting them properly for the bulk API command.
-
#prepare_params(document, params) ⇒ Object
Internal: convert special key parameters to their wire representation and apply any override document parameters.
-
#ready_to_send?(size) ⇒ Boolean
Internal: Determines if adding ‘size` more bytes and one more action will bring the current bulk request over the `request_size` limit or the `action_count` limit.
-
#update(document, params) ⇒ Object
Add an update action to the list of bulk actions to be performed when the bulk API call is made.
Constructor Details
#initialize(client, params = {}) ⇒ Bulk
Create a new bulk client for handling some of the details of accumulating documents to index and then formatting them properly for the bulk API command.
client - ElastomerClient::Client used for HTTP requests to the server params - Parameters Hash to pass to the Client#bulk method
:request_size - the maximum request size in bytes
:action_count - the maximum number of actions
186 187 188 189 190 191 192 193 194 195 |
# File 'lib/elastomer_client/client/bulk.rb', line 186 def initialize(client, params = {}) @client = client @params = params @actions = [] @current_request_size = 0 @current_action_count = 0 self.request_size = params.delete(:request_size) || DEFAULT_REQUEST_SIZE self.action_count = params.delete(:action_count) end |
Instance Attribute Details
#action_count ⇒ Object
Returns the value of attribute action_count.
197 198 199 |
# File 'lib/elastomer_client/client/bulk.rb', line 197 def action_count @action_count end |
#client ⇒ Object (readonly)
Returns the value of attribute client.
197 198 199 |
# File 'lib/elastomer_client/client/bulk.rb', line 197 def client @client end |
#request_size ⇒ Object
Returns the value of attribute request_size.
197 198 199 |
# File 'lib/elastomer_client/client/bulk.rb', line 197 def request_size @request_size end |
Instance Method Details
#add_to_actions(action, document = nil) ⇒ Object
Internal: Add the given ‘action` to the list of actions that will be performed by this bulk request. An optional `document` can also be given.
If the total size of the accumulated actions meets our desired request size, then a bulk API call will be performed. After the call the actions list is cleared and we’ll start accumulating actions again.
action - The bulk action (as a Hash) to perform document - Optional document for the action as a Hash or JSON encoded String
Returns the response from the bulk call if one was made or nil. Raises RequestSizeError if the given action is larger than the
configured requst size or the client.max_request_size
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# File 'lib/elastomer_client/client/bulk.rb', line 400 def add_to_actions(action, document = nil) action = MultiJson.dump(action) size = action.bytesize if document document = MultiJson.dump(document) unless document.is_a?(String) size += document.bytesize end check_action_size!(size) response = nil begin response = call if ready_to_send?(size) # rubocop:disable Lint/UselessRescue rescue StandardError => err raise err ensure @actions << action @actions << document unless document.nil? @current_request_size += size @current_action_count += 1 end response end |
#call ⇒ Object
Immediately execute a bulk API call with the currently accumulated actions. The accumulated actions list will be cleared after the call has been made.
If the accumulated actions list is empty then no action is taken.
Returns the response body Hash.
314 315 316 317 318 319 320 321 322 323 |
# File 'lib/elastomer_client/client/bulk.rb', line 314 def call return nil if @actions.empty? body = @actions.join("\n") + "\n" client.bulk(body, @params) ensure @current_request_size = 0 @current_action_count = 0 @actions.clear end |
#check_action_size!(size) ⇒ Object
Internal: Raises a RequestSizeError if the given size is larger than the configured client.max_request_size
443 444 445 446 |
# File 'lib/elastomer_client/client/bulk.rb', line 443 def check_action_size!(size) return unless size > client.max_request_size raise RequestSizeError, "Bulk action of size `#{size}` exceeds the maximum requst size: #{client.max_request_size}" end |
#convert_special_keys(params) ⇒ Object
Internal: Convert incoming Ruby symbol keys to their special underscore versions. Maintains API compaibility with the ‘Docs` API for `index`, `create`, `update` and `delete`.
:id -> :_id ‘id’ -> ‘_id’
params - Hash.
Returns a new params Hash with the special keys replaced.
354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 |
# File 'lib/elastomer_client/client/bulk.rb', line 354 def convert_special_keys(params) new_params = params.dup SPECIAL_KEYS.each do |key| omit_prefix = ( client.version_support.es_version_8_plus? && UNPREFIXED_SPECIAL_KEYS.include?(key) ) prefixed_key = "_" + key converted_key = (omit_prefix ? "" : "_") + key if new_params.key?(prefixed_key) new_params[converted_key] = new_params.delete(prefixed_key) end if new_params.key?(prefixed_key.to_sym) new_params[converted_key.to_sym] = new_params.delete(prefixed_key.to_sym) end if new_params.key?(key) new_params[converted_key] = new_params.delete(key) end if new_params.key?(key.to_sym) new_params[converted_key.to_sym] = new_params.delete(key.to_sym) end end new_params end |
#create(document, params) ⇒ Object
Add a create action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).
document - The document to create as a Hash or JSON encoded String params - Parameters for the create action (as a Hash) (optional)
Examples
create({"foo" => "bar"}, {:_id => 1}
create({"foo" => "bar"}, {:id => 1}
create("foo" => "bar", "_id" => 1)
Returns the response from the bulk call if one was made or nil.
269 270 271 272 |
# File 'lib/elastomer_client/client/bulk.rb', line 269 def create(document, params) params = prepare_params(document, params) add_to_actions({create: params}, document) end |
#delete(params) ⇒ Object
Add a delete action to the list of bulk actions to be performed when the bulk API call is made.
params - Parameters for the delete action (as a Hash)
Examples
delete(:_id => 1, :_type => 'foo')
Returns the response from the bulk call if one was made or nil.
302 303 304 305 |
# File 'lib/elastomer_client/client/bulk.rb', line 302 def delete(params) params = prepare_params(nil, params) add_to_actions({delete: params}) end |
#index(document, params = {}) ⇒ Object
Add an index action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).
document - The document to index as a Hash or JSON encoded String params - Parameters for the index action (as a Hash) (optional)
Examples
index({"foo" => "bar"}, {:_id => 1, :_type => "foo"}
index({"foo" => "bar"}, {:id => 1, :type => "foo"}
index("foo" => "bar", "_id" => 1, "_type" => "foo")
Returns the response from the bulk call if one was made or nil.
250 251 252 253 |
# File 'lib/elastomer_client/client/bulk.rb', line 250 def index(document, params = {}) params = prepare_params(document, params) add_to_actions({index: params}, document) end |
#prepare_params(document, params) ⇒ Object
Internal: convert special key parameters to their wire representation and apply any override document parameters.
330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/elastomer_client/client/bulk.rb', line 330 def prepare_params(document, params) params = convert_special_keys(params) params.delete(:_id) if params[:_id].nil? || params[:_id].to_s.empty? params.delete("_id") if params["_id"].nil? || params["_id"].to_s.empty? if client.version_support.es_version_8_plus? params.delete(:_type) params.delete("_type") end params end |
#ready_to_send?(size) ⇒ Boolean
Internal: Determines if adding ‘size` more bytes and one more action will bring the current bulk request over the `request_size` limit or the `action_count` limit. If this method returns true, then it is time to dispatch the bulk request.
Returns ‘true` of `false`
433 434 435 436 437 438 439 |
# File 'lib/elastomer_client/client/bulk.rb', line 433 def ready_to_send?(size) total_request_size = @current_request_size + size total_action_count = @current_action_count + 1 (request_size && total_request_size >= request_size) || (action_count && total_action_count > action_count) end |
#update(document, params) ⇒ Object
Add an update action to the list of bulk actions to be performed when the bulk API call is made. Parameters can be provided in the parameters hash (underscore prefix optional) or in the document hash (underscore prefix required).
document - The document to update as a Hash or JSON encoded String params - Parameters for the update action (as a Hash) (optional)
Examples
update({"doc" => {"foo" => "bar"}}, {:_id => 1})
update({"doc" => {"foo" => "bar"}}, {:id => 1})
update({"doc" => {"foo" => "bar"}}, "_id" => 1)
Returns the response from the bulk call if one was made or nil.
288 289 290 291 |
# File 'lib/elastomer_client/client/bulk.rb', line 288 def update(document, params) params = prepare_params(document, params) add_to_actions({update: params}, document) end |