Class: PikaQue::Processor
- Inherits:
-
Object
- Object
- PikaQue::Processor
show all
- Includes:
- Logging
- Defined in:
- lib/pika_que/processor.rb
Instance Method Summary
collapse
Methods included from Logging
init_logger, logger, #logger, logger=
Constructor Details
#initialize(opts = {}) ⇒ Processor
Returns a new instance of Processor.
9
10
11
12
13
14
15
16
|
# File 'lib/pika_que/processor.rb', line 9
def initialize(opts = {})
@opts = PikaQue.config.merge(opts)
@broker = PikaQue::Broker.new(self, @opts).tap{ |b| b.start }
@pool = Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1)
proc_config = @opts.merge({ broker: @broker, worker_pool: @pool })
@workers = @opts.fetch(:workers, []).map{ |w| PikaQue::Util.constantize(w).new(proc_config) }
@thread = nil
end
|
Instance Method Details
#process ⇒ Object
23
24
25
|
# File 'lib/pika_que/processor.rb', line 23
def process
@workers.each(&:run)
end
|
#setup ⇒ Object
18
19
20
21
|
# File 'lib/pika_que/processor.rb', line 18
def setup
logger.info "setting up processor with workers: #{@workers.map(&:class)}"
@workers.each(&:prepare)
end
|
#start ⇒ Object
27
28
29
30
31
32
33
|
# File 'lib/pika_que/processor.rb', line 27
def start
@thread = Thread.new do
Thread.current['label'] = 'processor'
setup
process
end.tap{ |t| t.abort_on_exception = true }
end
|
#stop ⇒ Object
35
36
37
38
39
40
41
42
43
|
# File 'lib/pika_que/processor.rb', line 35
def stop
@workers.each(&:stop)
@pool.shutdown
@pool.wait_for_termination 12
@broker.cleanup(true)
@broker.stop
end
|