Class: Tackle::Consumer::DelayQueue

Inherits:
Queue
  • Object
show all
Defined in:
lib/tackle/consumer/delay_queue.rb

Instance Attribute Summary

Attributes inherited from Queue

#name

Instance Method Summary collapse

Methods inherited from Queue

#create_amqp_queue

Constructor Details

#initialize(retry_delay, exchange, connection, logger) ⇒ DelayQueue

Returns a new instance of DelayQueue.



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/tackle/consumer/delay_queue.rb', line 5

def initialize(retry_delay, exchange, connection, logger)
  @retry_delay = retry_delay

  name = "#{exchange.name}.delay.#{retry_delay}"

  options = {
    :durable => true,
    :arguments => {
      "x-dead-letter-exchange" => exchange.name,
      "x-dead-letter-routing-key" => exchange.routing_key,
      "x-message-ttl" => retry_delay * 1000 # miliseconds
    }
  }

  super(name, options, connection, logger)
end

Instance Method Details

#publish(message) ⇒ Object



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/tackle/consumer/delay_queue.rb', line 22

def publish(message)
  message.log_error "Pushing message to delay queue delay='#{@retry_delay}'"

  headers = {
    :headers => {
      :retry_count => message.retry_count + 1
    }
  }

  @amqp_queue.publish(message.payload, headers)

  message.log_error "Message pushed to delay queue"
rescue StandardError => ex
  message.log_error "Error while pushing message to delay queue exception='#{ex}'"

  raise ex
end