Class: PikaQue::DelayWorker
- Inherits:
-
Object
- Object
- PikaQue::DelayWorker
- Defined in:
- lib/pika_que/delay_worker.rb
Instance Attribute Summary collapse
-
#broker ⇒ Object
Returns the value of attribute broker.
-
#handler ⇒ Object
Returns the value of attribute handler.
-
#pool ⇒ Object
Returns the value of attribute pool.
-
#queue ⇒ Object
Returns the value of attribute queue.
Instance Method Summary collapse
-
#initialize(opts = {}) ⇒ DelayWorker
constructor
A new instance of DelayWorker.
- #logger ⇒ Object
- #prepare ⇒ Object
- #run ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #work(delivery_info, metadata, msg) ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ DelayWorker
Returns a new instance of DelayWorker.
6 7 8 9 10 11 |
# File 'lib/pika_que/delay_worker.rb', line 6 def initialize(opts = {}) @opts = PikaQue.config.merge(opts) @broker = @opts[:broker] || PikaQue::Broker.new(nil, @opts).tap{ |b| b.start } @pool = @opts[:worker_pool] || Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1) @delay_name = "#{@opts[:exchange]}-delay" end |
Instance Attribute Details
#broker ⇒ Object
Returns the value of attribute broker.
4 5 6 |
# File 'lib/pika_que/delay_worker.rb', line 4 def broker @broker end |
#handler ⇒ Object
Returns the value of attribute handler.
4 5 6 |
# File 'lib/pika_que/delay_worker.rb', line 4 def handler @handler end |
#pool ⇒ Object
Returns the value of attribute pool.
4 5 6 |
# File 'lib/pika_que/delay_worker.rb', line 4 def pool @pool end |
#queue ⇒ Object
Returns the value of attribute queue.
4 5 6 |
# File 'lib/pika_que/delay_worker.rb', line 4 def queue @queue end |
Instance Method Details
#logger ⇒ Object
50 51 52 |
# File 'lib/pika_que/delay_worker.rb', line 50 def logger PikaQue.logger end |
#prepare ⇒ Object
13 14 15 16 17 18 19 |
# File 'lib/pika_que/delay_worker.rb', line 13 def prepare @queue = broker.queue(@delay_name, @opts[:queue_options]) @handler = broker.handler(@opts[:handler_class], @opts[:handler_options]) # TODO use routing keys? @handler.bind_queue(@queue, @queue.name) end |
#run ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/pika_que/delay_worker.rb', line 21 def run @consumer = queue.subscribe(:block => false, :manual_ack => @opts[:ack]) do | delivery_info, , msg | pool.post do work(delivery_info, , msg) end end end |
#start ⇒ Object
29 30 31 32 |
# File 'lib/pika_que/delay_worker.rb', line 29 def start prepare run end |
#stop ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pika_que/delay_worker.rb', line 34 def stop @consumer.cancel if @consumer @consumer = nil unless @opts[:worker_pool] @pool.shutdown @pool.wait_for_termination 12 end broker.cleanup broker.stop end |
#work(delivery_info, metadata, msg) ⇒ Object
46 47 48 |
# File 'lib/pika_que/delay_worker.rb', line 46 def work(delivery_info, , msg) handler.handle(:ack, broker.channel, delivery_info, , msg) end |