Class: Sqskiq::BatchProcessor

Inherits:
Object
  • Object
show all
Includes:
Celluloid, SignalHandler
Defined in:
lib/sqskiq/batch_process.rb

Instance Method Summary collapse

Methods included from SignalHandler

#shutting_down, #subscribe_for_shutdown

Constructor Details

#initializeBatchProcessor

Returns a new instance of BatchProcessor.



10
11
12
13
14
# File 'lib/sqskiq/batch_process.rb', line 10

def initialize
  @manager = Celluloid::Actor[:manager]
  @processor = Celluloid::Actor[:processor]
  subscribe_for_shutdown
end

Instance Method Details

#process(messages) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/sqskiq/batch_process.rb', line 16

def process(messages)
  process_result = []
  messages.each do |message|
    process_result << @processor.future.process(message)
  end

  success_messages = []
  process_result.each do |result|

    unless @shutting_down
      value = result.value
      if value[:success]
        success_messages << value[:message]
      end
    end
  end

  @manager.async.batch_done(success_messages)
end