Class: Chewy::Type::Import::Routine

Inherits:
Object
  • Object
show all
Defined in:
lib/chewy/type/import/routine.rb

Overview

This class performs the import routine for the options and objects given.

  1. Create target and journal indexes if needed.
  2. Iterate over all the passed objects in batches.
  3. For each batch #process method is called:
    • creates a bulk request body;
    • appends journal entries for the current batch to the request body;
    • prepends a leftovers bulk to the request body, which is calculated basing on the previous iteration errors;
    • performs the bulk request;
    • composes new leftovers bulk for the next iteration basing on the response errors if update_failover is true;
    • appends the rest of unfixable errors to the instance level errors array.
  4. Perform the request for the last leftovers bulk if present using #extract_leftovers.
  5. Return the result errors array.

At the moment, it tries to restore only from the partial document update errors in cases when the document doesn't exist only if update_failover option is true. In order to restore, it indexes such an objects completely on the next iteration.

Constant Summary collapse

BULK_OPTIONS =
%i[
  suffix bulk_size
  refresh timeout fields pipeline
  consistency replication
  wait_for_active_shards routing _source _source_exclude _source_include
].freeze
DEFAULT_OPTIONS =
{
  refresh: true,
  update_fields: [],
  update_failover: true,
  batch_size: Chewy::Type::Adapter::Base::BATCH_SIZE
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type, **options) ⇒ Routine

Basically, processes passed options, extracting bulk request specific options.

Parameters:



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/chewy/type/import/routine.rb', line 44

def initialize(type, **options)
  @type = type
  @options = options
  @options.reverse_merge!(@type._default_import_options)
  @options.reverse_merge!(journal: Chewy.configuration[:journal])
  @options.reverse_merge!(DEFAULT_OPTIONS)
  @bulk_options = @options.slice(*BULK_OPTIONS)
  @parallel_options = @options.delete(:parallel)
  if @parallel_options && !@parallel_options.is_a?(Hash)
    @parallel_options = if @parallel_options.is_a?(Integer)
      {in_processes: @parallel_options}
    else
      {}
    end
  end
  @errors = []
  @stats = {}
  @leftovers = []
end

Instance Attribute Details

#errorsObject (readonly)

Returns the value of attribute errors.



39
40
41
# File 'lib/chewy/type/import/routine.rb', line 39

def errors
  @errors
end

#leftoversObject (readonly)

Returns the value of attribute leftovers.



39
40
41
# File 'lib/chewy/type/import/routine.rb', line 39

def leftovers
  @leftovers
end

#optionsObject (readonly)

Returns the value of attribute options.



39
40
41
# File 'lib/chewy/type/import/routine.rb', line 39

def options
  @options
end

#parallel_optionsObject (readonly)

Returns the value of attribute parallel_options.



39
40
41
# File 'lib/chewy/type/import/routine.rb', line 39

def parallel_options
  @parallel_options
end

#statsObject (readonly)

Returns the value of attribute stats.



39
40
41
# File 'lib/chewy/type/import/routine.rb', line 39

def stats
  @stats
end

Instance Method Details

#create_indexes!Object

Creates the journal index and the type corresponding index if necessary.

Returns:



66
67
68
69
70
# File 'lib/chewy/type/import/routine.rb', line 66

def create_indexes!
  Chewy::Stash::Journal.create if @options[:journal]
  return if Chewy.configuration[:skip_index_creation_on_import]
  @type.index.create!(**@bulk_options.slice(:suffix)) unless @type.index.exists?
end

#perform_bulk(body) {|response| ... } ⇒ true, false

Performs a bulk request for the passed body.

Parameters:

  • body (Array<Hash>)

    a standard bulk request body

Yields:

  • (response)

Returns:

  • (true, false)

    the result of the request, true if no errors



101
102
103
104
105
106
107
# File 'lib/chewy/type/import/routine.rb', line 101

def perform_bulk(body)
  response = bulk.perform(body)
  yield response if block_given?
  Chewy.wait_for_status
  @errors.concat(response)
  response.blank?
end

#process(index: [], delete: []) ⇒ true, false

The main process method. Converts passed objects to thr bulk request body, appends journal entires, performs this request and handles errors performing failover procedures if applicable.

Parameters:

  • index (Array<Object>) (defaults to: [])

    any acceptable objects for indexing

  • delete (Array<Object>) (defaults to: [])

    any acceptable objects for deleting

Returns:

  • (true, false)

    the result of the request, true if no errors



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/chewy/type/import/routine.rb', line 79

def process(index: [], delete: [])
  bulk_builder = BulkBuilder.new(@type, index: index, delete: delete, fields: @options[:update_fields])
  bulk_body = bulk_builder.bulk_body

  if @options[:journal]
    journal_builder = JournalBuilder.new(@type, index: index, delete: delete)
    bulk_body.concat(journal_builder.bulk_body)
  end

  bulk_body.unshift(*flush_leftovers)

  perform_bulk(bulk_body) do |response|
    @leftovers = extract_leftovers(response, bulk_builder.index_objects_by_id)
    @stats[:index] = @stats[:index].to_i + index.count if index.present?
    @stats[:delete] = @stats[:delete].to_i + delete.count if delete.present?
  end
end