Class: PikaQue::Processor

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/pika_que/processor.rb

Instance Method Summary collapse

Methods included from Logging

init_logger, logger, #logger, logger=

Constructor Details

#initialize(opts = {}) ⇒ Processor

Returns a new instance of Processor.



9
10
11
12
13
14
15
16
# File 'lib/pika_que/processor.rb', line 9

def initialize(opts = {})
  @opts = PikaQue.config.merge(opts)
  @broker = PikaQue::Broker.new(self, @opts).tap{ |b| b.start }
  @pool = Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1)
  proc_config = @opts.merge({ broker: @broker, worker_pool: @pool })
  @workers = @opts.fetch(:workers, []).map{ |w| PikaQue::Util.constantize(w).new(proc_config) }
  @thread = nil
end

Instance Method Details

#processObject



23
24
25
# File 'lib/pika_que/processor.rb', line 23

def process
  @workers.each(&:run)
end

#setupObject



18
19
20
21
# File 'lib/pika_que/processor.rb', line 18

def setup
  logger.info "setting up processor with workers: #{@workers.map(&:class)}"
  @workers.each(&:prepare)
end

#startObject



27
28
29
30
31
32
33
# File 'lib/pika_que/processor.rb', line 27

def start
  @thread = Thread.new do
    Thread.current['label'] = 'processor'
    setup
    process
  end.tap{ |t| t.abort_on_exception = true }
end

#stopObject



35
36
37
38
39
40
41
42
43
# File 'lib/pika_que/processor.rb', line 35

def stop
  @workers.each(&:stop)

  @pool.shutdown
  @pool.wait_for_termination 12

  @broker.cleanup(true)
  @broker.stop
end