Module: Elasticsearch::Model::Importing::ClassMethods
- Included in:
- Proxy::ClassMethodsProxy
- Defined in:
- lib/elasticsearch/model/importing.rb
Instance Method Summary collapse
- #__batch_to_bulk(batch, transform) ⇒ Object
-
#import(options = {}, &block) {|Hash| ... } ⇒ Fixnum, Array<Hash>
Import all model records into the index.
Instance Method Details
#__batch_to_bulk(batch, transform) ⇒ Object
178 179 180 |
# File 'lib/elasticsearch/model/importing.rb', line 178 def __batch_to_bulk(batch, transform) batch.map { |model| transform.call(model) } end |
#import(options = {}, &block) {|Hash| ... } ⇒ Fixnum, Array<Hash>
Import all model records into the index
The method will pick up correct strategy based on the ‘Importing` module defined in the corresponding adapter.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/elasticsearch/model/importing.rb', line 136 def import(={}, &block) errors = [] refresh = .delete(:refresh) || false target_index = .delete(:index) || index_name transform = .delete(:transform) || __transform pipeline = .delete(:pipeline) return_value = .delete(:return) || 'count' unless transform.respond_to?(:call) raise ArgumentError, "Pass an object responding to `call` as the :transform option, #{transform.class} given" end if .delete(:force) self.create_index! force: true, index: target_index elsif !self.index_exists? index: target_index raise ArgumentError, "#{target_index} does not exist to be imported into. Use create_index! or the :force option to create it." end __find_in_batches() do |batch| params = { index: target_index, body: __batch_to_bulk(batch, transform) } params[:pipeline] = pipeline if pipeline response = client.bulk params yield response if block_given? errors += response['items'].select { |k, v| k.values.first['error'] } end self.refresh_index! index: target_index if refresh case return_value when 'errors' errors else errors.size end end |