Class: PikaQue::Handlers::DelayHandler
- Inherits:
-
Object
- Object
- PikaQue::Handlers::DelayHandler
- Defined in:
- lib/pika_que/handlers/delay_handler.rb
Constant Summary collapse
- DEFAULT_DELAY_OPTS =
default delays are 1min, 10min, 1hr, 24hr
{ :delay_periods => [60, 600, 3600, 86400], :delay_backoff_multiplier => 1000, }.freeze
Instance Method Summary collapse
- #bind_queue(queue, routing_key) ⇒ Object
- #close ⇒ Object
- #handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object
-
#initialize(opts = {}) ⇒ DelayHandler
constructor
A new instance of DelayHandler.
Constructor Details
#initialize(opts = {}) ⇒ DelayHandler
Returns a new instance of DelayHandler.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/pika_que/handlers/delay_handler.rb', line 21 def initialize(opts = {}) @opts = PikaQue.config.merge(DEFAULT_DELAY_OPTS).merge(opts) @connection = opts[:connection] || PikaQue.connection @channel = @connection.create_channel @delay_monitor = Monitor.new @root_monitor = Monitor.new # make sure it is in descending order @delay_periods = @opts[:delay_periods].sort!{ |x,y| y <=> x } @backoff_multiplier = @opts[:delay_backoff_multiplier] # This is for example/dev/test @delay_name = "#{@opts[:exchange]}-delay" @requeue_name = "#{@opts[:exchange]}-delay-requeue" @root_name = @opts[:exchange] setup_exchanges setup_queues end |
Instance Method Details
#bind_queue(queue, routing_key) ⇒ Object
40 41 42 43 |
# File 'lib/pika_que/handlers/delay_handler.rb', line 40 def bind_queue(queue, routing_key) # bind the worker queue to requeue exchange queue.bind(@requeue_exchange, :routing_key => routing_key) end |
#close ⇒ Object
63 64 65 |
# File 'lib/pika_que/handlers/delay_handler.rb', line 63 def close @channel.close unless @channel.closed? end |
#handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/pika_que/handlers/delay_handler.rb', line 45 def handle(response_code, channel, delivery_info, , msg, error = nil) delay_period = next_delay_period([:headers]) if delay_period > 0 # We will publish the message to the delay exchange PikaQue.logger.info "DelayHandler msg=delaying, delay=#{delay_period}, headers=#{[:headers]}" publish_delay(delivery_info, msg, [:headers].merge({ 'delay' => delay_period })) channel.acknowledge(delivery_info.delivery_tag, false) else # Publish the original message with the routing_key to the root exchange work_queue = [:headers]['work_queue'] PikaQue.logger.info "DelayHandler msg=publishing, queue=#{work_queue}, headers=#{[:headers]}" publish_work(work_queue, msg) channel.acknowledge(delivery_info.delivery_tag, false) end end |