Module: Elasticsearch::Model::Importing::ClassMethods

Included in:
Proxy::ClassMethodsProxy
Defined in:
lib/elasticsearch/model/importing.rb

Instance Method Summary collapse

Instance Method Details

#__batch_to_bulk(batch, transform) ⇒ Object



178
179
180
# File 'lib/elasticsearch/model/importing.rb', line 178

def __batch_to_bulk(batch, transform)
  batch.map { |model| transform.call(model) }
end

#import(options = {}, &block) {|Hash| ... } ⇒ Fixnum, Array<Hash>

Import all model records into the index

The method will pick up correct strategy based on the ‘Importing` module defined in the corresponding adapter.

Examples:

Import all records into the index


Article.import

Set the batch size to 100


Article.import batch_size: 100

Process the response from Elasticsearch


Article.import do |response|
  puts "Got " + response['items'].select { |i| i['index']['error'] }.size.to_s + " errors"
end

Delete and create the index with appropriate settings and mappings


Article.import force: true

Refresh the index after importing all batches


Article.import refresh: true

Import the records into a different index/type than the default one


Article.import index: 'my-new-index', type: 'my-other-type'

Pass an ActiveRecord scope to limit the imported records


Article.import scope: 'published'

Pass an ActiveRecord query to limit the imported records


Article.import query: -> { where(author_id: author_id) }

Transform records during the import with a lambda


transform = lambda do |a|
  {index: {_id: a.id, _parent: a.author_id, data: a.__elasticsearch__.as_indexed_json}}
end

Article.import transform: transform

Update the batch before yielding it


class Article
  # ...
  def self.enrich(batch)
    batch.each do |item|
      item. = MyAPI.(item.id)
    end
    batch
  end
end

Article.import preprocess: :enrich

Return an array of error elements instead of the number of errors, e.g. to try importing these records again


Article.import return: 'errors'

Parameters:

  • options (Hash) (defaults to: {})

    Options passed to the underlying ‘__find_in_batches` method

  • block (Proc)

    Optional block to evaluate for each batch

Yields:

  • (Hash)

    Gives the Hash with the Elasticsearch response to the block

Returns:

  • (Fixnum)

    default, number of errors encountered during importing

  • (Array<Hash>)

    if return option is specified to be “errors”, contains only those failed items in the response items key, e.g.:

    [
      {
        "index" => {
          "error" => 'FAILED',
          "_index" => "test",
          "_id" => '1',
          "_version" => 1,
          "result" => "foo",
          "_shards" => {
            "total" => 1,
            "successful" => 0,
            "failed" => 1
          },
          "status" => 400
        }
      }
    ]
    


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/elasticsearch/model/importing.rb', line 136

def import(options={}, &block)
  errors       = []
  refresh      = options.delete(:refresh)   || false
  target_index = options.delete(:index)     || index_name
  transform    = options.delete(:transform) || __transform
  pipeline     = options.delete(:pipeline)
  return_value = options.delete(:return)    || 'count'

  unless transform.respond_to?(:call)
    raise ArgumentError,
          "Pass an object responding to `call` as the :transform option, #{transform.class} given"
  end

  if options.delete(:force)
    self.create_index! force: true, index: target_index
  elsif !self.index_exists? index: target_index
    raise ArgumentError,
          "#{target_index} does not exist to be imported into. Use create_index! or the :force option to create it."
  end

  __find_in_batches(options) do |batch|
    params = {
      index: target_index,
      body:  __batch_to_bulk(batch, transform)
    }
    params[:pipeline] = pipeline if pipeline
    response = client.bulk params
    yield response if block_given?

    errors +=  response['items'].select { |k, v| k.values.first['error'] }
  end

  self.refresh_index! index: target_index if refresh

  case return_value
    when 'errors'
      errors
    else
      errors.size
  end
end