Class: Elasticsearch::Helpers::BulkHelper

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticsearch/helpers/bulk_helper.rb

Overview

Elasticsearch Client Helper for the Bulk API

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, index, params = {}) ⇒ BulkHelper

Create a BulkHelper

Parameters:

  • client (Elasticsearch::Client)

    Instance of Elasticsearch client to use.

  • index (String)

    Index on which to perform the Bulk actions.

  • params (Hash) (defaults to: {})

    Parameters to re-use in every bulk call



33
34
35
36
37
# File 'lib/elasticsearch/helpers/bulk_helper.rb', line 33

def initialize(client, index, params = {})
  @client = client
  @index = index
  @params = params
end

Instance Attribute Details

#indexObject

Returns the value of attribute index.



25
26
27
# File 'lib/elasticsearch/helpers/bulk_helper.rb', line 25

def index
  @index
end

Instance Method Details

#delete(ids, params = {}, body = {}) ⇒ Object

Delete documents using the Bulk API

Parameters:

  • ids (Array)

    Array of id’s for documents to delete.

  • params (Hash) (defaults to: {})

    Parameters to send to bulk delete.



64
65
66
67
# File 'lib/elasticsearch/helpers/bulk_helper.rb', line 64

def delete(ids, params = {}, body = {})
  delete_docs = ids.map { |id| { delete: { _index: @index, _id: id} } }
  @client.bulk({ body: delete_docs }.merge(params.merge(@params)))
end

#ingest(docs, params = {}, body = {}, &block) {|response, ingest_docs| ... } ⇒ Object

Index documents using the Bulk API.

Parameters:

  • docs (Array<Hash>)

    The documents to be indexed.

  • params (Hash) (defaults to: {})

    Parameters to use in the bulk ingestion. See the official Elastic documentation for Bulk API for parameters to send to the Bulk API.

  • block (Block)

    Optional block to run after ingesting a batch of documents.

Options Hash (params):

  • slice (Integer)

    number of documents to send to the Bulk API for eatch batch of ingestion.

Yield Parameters:

  • response (Elasticsearch::Transport::Response)

    The response object from calling the Bulk API.

  • ingest_docs (Array<Hash>)

    The collection of documents sent in the bulk request.



48
49
50
51
52
53
54
55
56
57
# File 'lib/elasticsearch/helpers/bulk_helper.rb', line 48

def ingest(docs, params = {}, body = {}, &block)
  ingest_docs = docs.map { |doc| { index: { _index: @index, data: doc} } }
  if (slice = params.delete(:slice))
    ingest_docs.each_slice(slice) do |items|
      ingest(items.map { |item| item[:index][:data] }, params, &block)
    end
  else
    bulk_request(ingest_docs, params, &block)
  end
end

#ingest_json(file, params = {}) {|response, ingest_docs| ... } ⇒ Object

Ingest data directly from a JSON file

Parameters:

  • file (String)

    (Required) The file path.

  • params (Hash) (defaults to: {})

    Parameters to use in the bulk ingestion.

Options Hash (params):

  • slice (Integer)

    number of documents to send to the Bulk API for eatch batch of updates.

  • keys (Array|String)

    If the data needs to be digged from the JSON file, the keys can be passed in with this parameter to find it.

    E.g.: If the data in the parsed JSON Hash is found in json_parsed[‘items’], keys would be passed like this (as an Array):

    bulk_helper.ingest_json(file, { keys: [‘data’, ‘items’] })

    or as a String:

    bulk_helper.ingest_json(file, { keys: ‘data, items’ })

Yield Parameters:

  • response (Elasticsearch::Transport::Response)

    The response object from calling the Bulk API.

  • ingest_docs (Array<Hash>)

    The collection of documents sent in the bulk request.



110
111
112
113
114
115
116
117
118
# File 'lib/elasticsearch/helpers/bulk_helper.rb', line 110

def ingest_json(file, params = {}, &block)
  data = JSON.parse(File.read(file))
  if (keys = params.delete(:keys))
    keys = keys.split(',') if keys.is_a?(String)
    data = data.dig(*keys)
  end

  ingest(data, params, &block)
end

#update(docs, params = {}, body = {}, &block) {|response, ingest_docs| ... } ⇒ Object

Update documents using the Bulk API

Parameters:

  • docs (Array<Hash>)

    (Required) The documents to be updated.

  • block (Block)

    Optional block to run after ingesting a batch of documents.

  • params (Hash) (defaults to: {})

    a customizable set of options

Options Hash (params):

  • slice (Integer)

    number of documents to send to the Bulk API for eatch batch of updates.

Yield Parameters:

  • response (Elasticsearch::Transport::Response)

    The response object from calling the Bulk API.

  • ingest_docs (Array<Hash>)

    The collection of documents sent in the bulk request.



78
79
80
81
82
83
84
85
86
87
# File 'lib/elasticsearch/helpers/bulk_helper.rb', line 78

def update(docs, params = {}, body = {}, &block)
  ingest_docs = docs.map do |doc|
    { update: { _index: @index, _id: doc.delete('id'), data: { doc: doc } } }
  end
  if (slice = params.delete(:slice))
    ingest_docs.each_slice(slice) { |items| update(items, params, &block) }
  else
    bulk_request(ingest_docs, params, &block)
  end
end