Class: RabbitMqQueueManager
- Inherits:
-
Object
- Object
- RabbitMqQueueManager
- Defined in:
- lib/eventq_rabbitmq/rabbitmq_queue_manager.rb
Instance Method Summary collapse
- #get_exchange(channel, exchange) ⇒ Object
- #get_queue(channel, queue) ⇒ Object
- #get_retry_exchange(channel, queue) ⇒ Object
- #get_retry_queue(channel, queue) ⇒ Object
- #get_subscriber_exchange(channel, queue) ⇒ Object
-
#initialize ⇒ RabbitMqQueueManager
constructor
A new instance of RabbitMqQueueManager.
Constructor Details
#initialize ⇒ RabbitMqQueueManager
Returns a new instance of RabbitMqQueueManager.
3 4 5 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 3 def initialize @event_raised_exchange = EventRaisedExchange.new end |
Instance Method Details
#get_exchange(channel, exchange) ⇒ Object
38 39 40 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 38 def get_exchange(channel, exchange) return channel.direct(exchange.name, :durable => true) end |
#get_queue(channel, queue) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 7 def get_queue(channel, queue) #get/create the queue q = channel.queue(queue.name, :durable => true) if queue.allow_retry retry_exchange = get_retry_exchange(channel, queue) subscriber_exchange = get_subscriber_exchange(channel, queue) retry_queue = get_retry_queue(channel, queue) retry_queue.bind(retry_exchange) q.bind(subscriber_exchange) end return q end |
#get_retry_exchange(channel, queue) ⇒ Object
25 26 27 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 25 def get_retry_exchange(channel, queue) return channel.fanout("#{queue.name}.r.ex") end |
#get_retry_queue(channel, queue) ⇒ Object
33 34 35 36 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 33 def get_retry_queue(channel, queue) subscriber_exchange = get_subscriber_exchange(channel, queue) return channel.queue("#{queue.name}.r", :durable => true, :arguments => { "x-dead-letter-exchange" => subscriber_exchange.name, "x-message-ttl" => queue.retry_delay }) end |
#get_subscriber_exchange(channel, queue) ⇒ Object
29 30 31 |
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 29 def get_subscriber_exchange(channel, queue) return channel.fanout("#{queue.name}.ex") end |