Class: PikaQue::Handlers::DelayHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/pika_que/handlers/delay_handler.rb

Constant Summary collapse

DEFAULT_DELAY_OPTS =

default delays are 1min, 10min, 1hr, 24hr

{
  :delay_periods            => [60, 600, 3600, 86400],
  :delay_backoff_multiplier => 1000,
}.freeze

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ DelayHandler

Returns a new instance of DelayHandler.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/pika_que/handlers/delay_handler.rb', line 21

def initialize(opts = {})
  @opts = PikaQue.config.merge(DEFAULT_DELAY_OPTS).merge(opts)
  @connection = opts[:connection] || PikaQue.connection
  @channel = @connection.create_channel
  @delay_monitor = Monitor.new
  @root_monitor = Monitor.new

  # make sure it is in descending order
  @delay_periods = @opts[:delay_periods].sort!{ |x,y| y <=> x }
  @backoff_multiplier = @opts[:delay_backoff_multiplier] # This is for example/dev/test

  @delay_name = "#{@opts[:exchange]}-delay"
  @requeue_name = "#{@opts[:exchange]}-delay-requeue"
  @root_name = @opts[:exchange]

  setup_exchanges
  setup_queues
end

Instance Method Details

#bind_queue(queue, routing_key) ⇒ Object



40
41
42
43
# File 'lib/pika_que/handlers/delay_handler.rb', line 40

def bind_queue(queue, routing_key)
  # bind the worker queue to requeue exchange
  queue.bind(@requeue_exchange, :routing_key => routing_key)
end

#closeObject



63
64
65
# File 'lib/pika_que/handlers/delay_handler.rb', line 63

def close
  @channel.close unless @channel.closed?
end

#handle(response_code, channel, delivery_info, metadata, msg, error = nil) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/pika_que/handlers/delay_handler.rb', line 45

def handle(response_code, channel, delivery_info, , msg, error = nil)
  delay_period = next_delay_period([:headers])
  if delay_period > 0
    # We will publish the message to the delay exchange              
    PikaQue.logger.info "DelayHandler msg=delaying, delay=#{delay_period}, headers=#{[:headers]}"

    publish_delay(delivery_info, msg, [:headers].merge({ 'delay' => delay_period }))
    channel.acknowledge(delivery_info.delivery_tag, false)
  else
    # Publish the original message with the routing_key to the root exchange
    work_queue = [:headers]['work_queue']
    PikaQue.logger.info "DelayHandler msg=publishing, queue=#{work_queue}, headers=#{[:headers]}"

    publish_work(work_queue, msg)
    channel.acknowledge(delivery_info.delivery_tag, false)
  end
end