Class: Elastictastic::BulkPersistenceStrategy
- Inherits:
-
Object
- Object
- Elastictastic::BulkPersistenceStrategy
- Defined in:
- lib/elastictastic/bulk_persistence_strategy.rb
Defined Under Namespace
Classes: Operation
Constant Summary collapse
- DEFAULT_HANDLER =
proc { |e| raise(e) if e }
Instance Method Summary collapse
- #create(instance, params = {}, &block) ⇒ Object
- #destroy(instance, &block) ⇒ Object
- #destroy!(index, type, id, routing, parent) ⇒ Object
- #flush ⇒ Object
-
#initialize(options) ⇒ BulkPersistenceStrategy
constructor
A new instance of BulkPersistenceStrategy.
- #update(instance, &block) ⇒ Object
Constructor Details
#initialize(options) ⇒ BulkPersistenceStrategy
Returns a new instance of BulkPersistenceStrategy.
8 9 10 11 12 |
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 8 def initialize() @operations = [] @operations_by_id = {} @auto_flush = .delete(:auto_flush) end |
Instance Method Details
#create(instance, params = {}, &block) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 14 def create(instance, params = {}, &block) block ||= DEFAULT_HANDLER if instance.pending_save? raise Elastictastic::OperationNotAllowed, "Can't re-save transient document with pending save in bulk operation" end instance.pending_save! add( instance.index, instance.id, { 'create' => bulk_identifier_for_instance(instance) }, instance.elasticsearch_doc ) do |response| if response['create']['error'] block.call(ServerError[response['create']['error']]) else instance.id = response['create']['_id'] instance.version = response['create']['_version'] instance.persisted! block.call end end end |
#destroy(instance, &block) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 56 def destroy(instance, &block) block ||= DEFAULT_HANDLER instance.pending_destroy! add(instance.index, instance.id, :delete => bulk_identifier_for_instance(instance)) do |response| if response['delete']['error'] block.call(ServerError[response['delete']['error']]) else instance.transient! instance.version = response['delete']['_version'] block.call end end end |
#destroy!(index, type, id, routing, parent) ⇒ Object
70 71 72 73 74 75 |
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 70 def destroy!(index, type, id, routing, parent) add( index, id, :delete => bulk_identifier(index, type, id, routing, parent, nil) ) end |
#flush ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 77 def flush return if @operations.empty? params = {} params[:refresh] = true if Elastictastic.config.auto_refresh io = StringIO.new operations = @operations.reject { |operation| operation.skip } @operations.clear operations.each do |operation| operation.commands.each do |command| io.puts Elastictastic.json_encode(command) end end response = Elastictastic.client.bulk(io.string, params) response['items'].each_with_index do |op_response, i| operation = operations[i] operation.handler.call(op_response) if operation.handler end response end |
#update(instance, &block) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/elastictastic/bulk_persistence_strategy.rb', line 38 def update(instance, &block) block ||= DEFAULT_HANDLER instance.pending_save! add( instance.index, instance.id, { 'index' => bulk_identifier_for_instance(instance) }, instance.elasticsearch_doc ) do |response| if response['index']['error'] block.call(ServerError[response['index']['error']]) else instance.version = response['index']['_version'] block.call end end end |