Class: PikaQue::Processor
- Inherits:
-
Object
- Object
- PikaQue::Processor
show all
- Includes:
- Logging
- Defined in:
- lib/pika_que/processor.rb
Instance Method Summary
collapse
Methods included from Logging
init_logger, logger, #logger, logger=
Constructor Details
#initialize(opts = {}) ⇒ Processor
Returns a new instance of Processor.
9
10
11
12
13
14
15
16
|
# File 'lib/pika_que/processor.rb', line 9
def initialize(opts = {})
@opts = PikaQue.config.merge(opts)
@broker = PikaQue::Broker.new(self, @opts).tap{ |b| b.start }
@pool = Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1)
proc_config = @opts.merge({ broker: @broker, worker_pool: @pool })
@workers = @opts.fetch(:workers, []).map{ |w| PikaQue::Util.constantize(w).new(proc_config) }
@thread = nil
end
|
Instance Method Details
#process ⇒ Object
22
23
24
|
# File 'lib/pika_que/processor.rb', line 22
def process
@workers.each(&:run)
end
|
#setup ⇒ Object
18
19
20
|
# File 'lib/pika_que/processor.rb', line 18
def setup
@workers.each(&:prepare)
end
|
#start ⇒ Object
26
27
28
29
30
31
32
|
# File 'lib/pika_que/processor.rb', line 26
def start
@thread = Thread.new do
Thread.current['label'] = 'processor'
setup
process
end.abort_on_exception = true
end
|
#stop ⇒ Object
34
35
36
37
38
39
40
41
42
|
# File 'lib/pika_que/processor.rb', line 34
def stop
@workers.each(&:stop)
@pool.shutdown
@pool.wait_for_termination 12
@broker.cleanup(true)
@broker.stop
end
|