Module: Chewy::Type::Import::ClassMethods

Defined in:
lib/chewy/type/import.rb

Instance Method Summary collapse

Instance Method Details

#bulk(options = {}) ⇒ Object

Wraps elasticsearch-ruby client indices bulk method. Adds ‘:suffix` option to bulk import to index with specified suffix.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/chewy/type/import.rb', line 65

def bulk options = {}
  suffix = options.delete(:suffix)
  bulk_size = options.delete(:bulk_size)
  body = options.delete(:body)
  header = { index: index.build_index_name(suffix: suffix), type: type_name }

  bodies = if bulk_size
    bulk_size -= 1.kilobyte # 1 kilobyte for request header and newlines
    raise ArgumentError.new('Import `:bulk_size` can\'t be less then 1 kilobyte') if bulk_size <= 0

    body.each_with_object(['']) do |entry, result|
      operation, meta = entry.to_a.first
      data = meta.delete(:data)
      entry = [{ operation => meta }, data].compact.map(&:to_json).join("\n")
      if entry.bytesize > bulk_size
        raise ArgumentError.new('Import `:bulk_size` seems to be less then entry size')
      elsif result.last.bytesize + entry.bytesize > bulk_size
        result.push(entry)
      else
        result[-1] = [result[-1], entry].delete_if(&:blank?).join("\n")
      end
    end
  else
    [body]
  end

  items = bodies.map do |body|
    result = client.bulk options.merge(header).merge(body: body)
    result.try(:[], 'items') || []
  end.flatten
  Chewy.wait_for_status

  extract_errors items
end

#import(*args) ⇒ Object

Perform import operation for specified documents. Returns true or false depending on success.

UsersIndex::User.import                          # imports default data set
UsersIndex::User.import User.active              # imports active users
UsersIndex::User.import [1, 2, 3]                # imports users with specified ids
UsersIndex::User.import users                    # imports users collection
UsersIndex::User.import suffix: Time.now.to_i    # imports data to index with specified suffix if such is exists
UsersIndex::User.import refresh: false           # to disable index refreshing after import
UsersIndex::User.import batch_size: 300          # import batch size
UsersIndex::User.import bulk_size: 10.megabytes  # import ElasticSearch bulk size in bytes
UsersIndex::User.import consistency: :quorum     # explicit write consistency setting for the operation (one, quorum, all)
UsersIndex::User.import replication: :async      # explicitly set the replication type (sync, async)

See adapters documentation for more details.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/chewy/type/import.rb', line 25

def import *args
  import_options = args.extract_options!
  import_options.reverse_merge! _default_import_options
  bulk_options = import_options.reject { |k, _| !BULK_OPTIONS.include?(k) }.reverse_merge!(refresh: true)

  index.create!(bulk_options.slice(:suffix)) unless index.exists?

  ActiveSupport::Notifications.instrument 'import_objects.chewy', type: self do |payload|
    adapter.import(*args, import_options) do |action_objects|
      indexed_objects = build_root.parent_id && fetch_indexed_objects(action_objects.values.flatten)
      body = bulk_body(action_objects, indexed_objects)

      errors = bulk(bulk_options.merge(body: body)) if body.present?

      fill_payload_import payload, action_objects
      fill_payload_errors payload, errors if errors.present?
      !errors.present?
    end
  end
end

#import!(*args) ⇒ Object

Perform import operation for specified documents. Raises Chewy::ImportFailed exception in case of import errors. Options are completely the same as for ‘import` method See adapters documentation for more details.



51
52
53
54
55
56
57
58
59
60
61
# File 'lib/chewy/type/import.rb', line 51

def import! *args
  errors = nil
  subscriber = ActiveSupport::Notifications.subscribe('import_objects.chewy') do |*args|
    errors = args.last[:errors]
  end
  import *args
  raise Chewy::ImportFailed.new(self, errors) if errors.present?
  true
ensure
  ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end