Class: RabbitMqQueueManager

Inherits:
Object
  • Object
show all
Defined in:
lib/eventq_rabbitmq/rabbitmq_queue_manager.rb

Instance Method Summary collapse

Constructor Details

#initializeRabbitMqQueueManager

Returns a new instance of RabbitMqQueueManager.



3
4
5
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 3

def initialize
  @event_raised_exchange = EventRaisedExchange.new
end

Instance Method Details

#get_exchange(channel, exchange) ⇒ Object



38
39
40
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 38

def get_exchange(channel, exchange)
  return channel.direct(exchange.name, :durable => true)
end

#get_queue(channel, queue) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 7

def get_queue(channel, queue)

  #get/create the queue
  q = channel.queue(queue.name, :durable => true)

  if queue.allow_retry
    retry_exchange = get_retry_exchange(channel, queue)
    subscriber_exchange = get_subscriber_exchange(channel, queue)

    retry_queue = get_retry_queue(channel, queue)
    retry_queue.bind(retry_exchange)

    q.bind(subscriber_exchange)
  end

  return q
end

#get_retry_exchange(channel, queue) ⇒ Object



25
26
27
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 25

def get_retry_exchange(channel, queue)
  return channel.fanout("#{queue.name}.r.ex")
end

#get_retry_queue(channel, queue) ⇒ Object



33
34
35
36
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 33

def get_retry_queue(channel, queue)
  subscriber_exchange = get_subscriber_exchange(channel, queue)
  return channel.queue("#{queue.name}.r", :durable => true, :arguments => { "x-dead-letter-exchange" => subscriber_exchange.name, "x-message-ttl" => queue.retry_delay })
end

#get_subscriber_exchange(channel, queue) ⇒ Object



29
30
31
# File 'lib/eventq_rabbitmq/rabbitmq_queue_manager.rb', line 29

def get_subscriber_exchange(channel, queue)
  return channel.fanout("#{queue.name}.ex")
end