Module: Chewy::Type::Import::ClassMethods
- Defined in:
- lib/chewy/type/import.rb
Instance Method Summary collapse
-
#bulk(options = {}) ⇒ Object
Wraps elasticsearch-ruby client indices bulk method.
-
#import(*args) ⇒ Object
Perform import operation for specified documents.
-
#import!(*args) ⇒ Object
Perform import operation for specified documents.
Instance Method Details
#bulk(options = {}) ⇒ Object
Wraps elasticsearch-ruby client indices bulk method. Adds ‘:suffix` option to bulk import to index with specified suffix.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/chewy/type/import.rb', line 65 def bulk = {} suffix = .delete(:suffix) bulk_size = .delete(:bulk_size) body = .delete(:body) header = { index: index.build_index_name(suffix: suffix), type: type_name } bodies = if bulk_size bulk_size -= 1.kilobyte # 1 kilobyte for request header and newlines raise ArgumentError.new('Import `:bulk_size` can\'t be less then 1 kilobyte') if bulk_size <= 0 body.each_with_object(['']) do |entry, result| operation, = entry.to_a.first data = .delete(:data) entry = [{ operation => }, data].compact.map(&:to_json).join("\n") if entry.bytesize > bulk_size raise ArgumentError.new('Import `:bulk_size` seems to be less then entry size') elsif result.last.bytesize + entry.bytesize > bulk_size result.push(entry) else result[-1] = [result[-1], entry].delete_if(&:blank?).join("\n") end end else [body] end items = bodies.map do |body| result = client.bulk .merge(header).merge(body: body) result.try(:[], 'items') || [] end.flatten Chewy.wait_for_status extract_errors items end |
#import(*args) ⇒ Object
Perform import operation for specified documents. Returns true or false depending on success.
UsersIndex::User.import # imports default data set
UsersIndex::User.import User.active # imports active users
UsersIndex::User.import [1, 2, 3] # imports users with specified ids
UsersIndex::User.import users # imports users collection
UsersIndex::User.import suffix: Time.now.to_i # imports data to index with specified suffix if such is exists
UsersIndex::User.import refresh: false # to disable index refreshing after import
UsersIndex::User.import batch_size: 300 # import batch size
UsersIndex::User.import bulk_size: 10.megabytes # import ElasticSearch bulk size in bytes
UsersIndex::User.import consistency: :quorum # explicit write consistency setting for the operation (one, quorum, all)
UsersIndex::User.import replication: :async # explicitly set the replication type (sync, async)
See adapters documentation for more details.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/chewy/type/import.rb', line 25 def import *args = args. .reverse_merge! = .reject { |k, _| !BULK_OPTIONS.include?(k) }.reverse_merge!(refresh: true) index.create!(.slice(:suffix)) unless index.exists? ActiveSupport::Notifications.instrument 'import_objects.chewy', type: self do |payload| adapter.import(*args, ) do |action_objects| indexed_objects = build_root.parent_id && fetch_indexed_objects(action_objects.values.flatten) body = bulk_body(action_objects, indexed_objects) errors = bulk(.merge(body: body)) if body.present? fill_payload_import payload, action_objects fill_payload_errors payload, errors if errors.present? !errors.present? end end end |
#import!(*args) ⇒ Object
Perform import operation for specified documents. Raises Chewy::ImportFailed exception in case of import errors. Options are completely the same as for ‘import` method See adapters documentation for more details.
51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/chewy/type/import.rb', line 51 def import! *args errors = nil subscriber = ActiveSupport::Notifications.subscribe('import_objects.chewy') do |*args| errors = args.last[:errors] end import *args raise Chewy::ImportFailed.new(self, errors) if errors.present? true ensure ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber end |