Class: Chewy::Type::Import::Routine
- Defined in:
- lib/chewy/type/import/routine.rb
Overview
This class performs the import routine for the options and objects given.
- Create target and journal indexes if needed.
- Iterate over all the passed objects in batches.
- For each batch #process method is called:
- creates a bulk request body;
- appends journal entries for the current batch to the request body;
- prepends a leftovers bulk to the request body, which is calculated basing on the previous iteration errors;
- performs the bulk request;
- composes new leftovers bulk for the next iteration basing on the response errors if
update_failover
is true; - appends the rest of unfixable errors to the instance level errors array.
- Perform the request for the last leftovers bulk if present using #extract_leftovers.
- Return the result errors array.
At the moment, it tries to restore only from the partial document update errors in cases
when the document doesn't exist only if update_failover
option is true. In order to
restore, it indexes such an objects completely on the next iteration.
Constant Summary collapse
- BULK_OPTIONS =
%i[ suffix bulk_size refresh timeout fields pipeline consistency replication wait_for_active_shards routing _source _source_exclude _source_include ].freeze
- DEFAULT_OPTIONS =
{ refresh: true, update_fields: [], update_failover: true, batch_size: Chewy::Type::Adapter::Base::BATCH_SIZE }.freeze
Instance Attribute Summary collapse
-
#errors ⇒ Object
readonly
Returns the value of attribute errors.
-
#leftovers ⇒ Object
readonly
Returns the value of attribute leftovers.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#parallel_options ⇒ Object
readonly
Returns the value of attribute parallel_options.
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
Instance Method Summary collapse
-
#create_indexes! ⇒ Object
Creates the journal index and the type corresponding index if necessary.
-
#initialize(type, **options) ⇒ Routine
constructor
Basically, processes passed options, extracting bulk request specific options.
-
#perform_bulk(body) {|response| ... } ⇒ true, false
Performs a bulk request for the passed body.
-
#process(index: [], delete: []) ⇒ true, false
The main process method.
Constructor Details
#initialize(type, **options) ⇒ Routine
Basically, processes passed options, extracting bulk request specific options.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/chewy/type/import/routine.rb', line 44 def initialize(type, **) @type = type @options = @options.reverse_merge!(@type.) @options.reverse_merge!(journal: Chewy.configuration[:journal]) @options.reverse_merge!(DEFAULT_OPTIONS) @bulk_options = @options.slice(*BULK_OPTIONS) @parallel_options = @options.delete(:parallel) if @parallel_options && !@parallel_options.is_a?(Hash) @parallel_options = if @parallel_options.is_a?(Integer) {in_processes: @parallel_options} else {} end end @errors = [] @stats = {} @leftovers = [] end |
Instance Attribute Details
#errors ⇒ Object (readonly)
Returns the value of attribute errors.
39 40 41 |
# File 'lib/chewy/type/import/routine.rb', line 39 def errors @errors end |
#leftovers ⇒ Object (readonly)
Returns the value of attribute leftovers.
39 40 41 |
# File 'lib/chewy/type/import/routine.rb', line 39 def leftovers @leftovers end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
39 40 41 |
# File 'lib/chewy/type/import/routine.rb', line 39 def @options end |
#parallel_options ⇒ Object (readonly)
Returns the value of attribute parallel_options.
39 40 41 |
# File 'lib/chewy/type/import/routine.rb', line 39 def @parallel_options end |
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
39 40 41 |
# File 'lib/chewy/type/import/routine.rb', line 39 def stats @stats end |
Instance Method Details
#create_indexes! ⇒ Object
Creates the journal index and the type corresponding index if necessary.
66 67 68 69 70 |
# File 'lib/chewy/type/import/routine.rb', line 66 def create_indexes! Chewy::Stash::Journal.create if @options[:journal] return if Chewy.configuration[:skip_index_creation_on_import] @type.index.create!(**@bulk_options.slice(:suffix)) unless @type.index.exists? end |
#perform_bulk(body) {|response| ... } ⇒ true, false
Performs a bulk request for the passed body.
101 102 103 104 105 106 107 |
# File 'lib/chewy/type/import/routine.rb', line 101 def perform_bulk(body) response = bulk.perform(body) yield response if block_given? Chewy.wait_for_status @errors.concat(response) response.blank? end |
#process(index: [], delete: []) ⇒ true, false
The main process method. Converts passed objects to thr bulk request body, appends journal entires, performs this request and handles errors performing failover procedures if applicable.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/chewy/type/import/routine.rb', line 79 def process(index: [], delete: []) bulk_builder = BulkBuilder.new(@type, index: index, delete: delete, fields: @options[:update_fields]) bulk_body = bulk_builder.bulk_body if @options[:journal] journal_builder = JournalBuilder.new(@type, index: index, delete: delete) bulk_body.concat(journal_builder.bulk_body) end bulk_body.unshift(*flush_leftovers) perform_bulk(bulk_body) do |response| @leftovers = extract_leftovers(response, bulk_builder.index_objects_by_id) @stats[:index] = @stats[:index].to_i + index.count if index.present? @stats[:delete] = @stats[:delete].to_i + delete.count if delete.present? end end |