Class: PikaQue::DelayWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/pika_que/delay_worker.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ DelayWorker

Returns a new instance of DelayWorker.



6
7
8
9
10
11
# File 'lib/pika_que/delay_worker.rb', line 6

def initialize(opts = {})
  @opts = PikaQue.config.merge(opts)
  @broker = @opts[:broker] || PikaQue::Broker.new(nil, @opts).tap{ |b| b.start }
  @pool = @opts[:worker_pool] || Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1)
  @delay_name = "#{@opts[:exchange]}-delay"
end

Instance Attribute Details

#brokerObject

Returns the value of attribute broker.



4
5
6
# File 'lib/pika_que/delay_worker.rb', line 4

def broker
  @broker
end

#handlerObject

Returns the value of attribute handler.



4
5
6
# File 'lib/pika_que/delay_worker.rb', line 4

def handler
  @handler
end

#poolObject

Returns the value of attribute pool.



4
5
6
# File 'lib/pika_que/delay_worker.rb', line 4

def pool
  @pool
end

#queueObject

Returns the value of attribute queue.



4
5
6
# File 'lib/pika_que/delay_worker.rb', line 4

def queue
  @queue
end

Instance Method Details

#loggerObject



50
51
52
# File 'lib/pika_que/delay_worker.rb', line 50

def logger
  PikaQue.logger
end

#prepareObject



13
14
15
16
17
18
19
# File 'lib/pika_que/delay_worker.rb', line 13

def prepare
  @queue = broker.queue(@delay_name, @opts[:queue_options])

  @handler = broker.handler(@opts[:handler_class], @opts[:handler_options])
  # TODO use routing keys?
  @handler.bind_queue(@queue, @queue.name)
end

#runObject



21
22
23
24
25
26
27
# File 'lib/pika_que/delay_worker.rb', line 21

def run
  @consumer = queue.subscribe(:block => false, :manual_ack => @opts[:ack]) do | delivery_info, , msg |
    pool.post do
      work(delivery_info, , msg)
    end
  end
end

#startObject



29
30
31
32
# File 'lib/pika_que/delay_worker.rb', line 29

def start
  prepare
  run
end

#stopObject



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pika_que/delay_worker.rb', line 34

def stop
  @consumer.cancel if @consumer
  @consumer = nil

  unless @opts[:worker_pool]
    @pool.shutdown
    @pool.wait_for_termination 12
  end
  broker.cleanup
  broker.stop
end

#work(delivery_info, metadata, msg) ⇒ Object



46
47
48
# File 'lib/pika_que/delay_worker.rb', line 46

def work(delivery_info, , msg)
  handler.handle(:ack, broker.channel, delivery_info, , msg)
end