Class: RosettaQueue::Gateway::SynchExchange::FanoutExchange

Inherits:
BaseExchange
  • Object
show all
Includes:
Fanout
Defined in:
lib/rosetta_queue/adapters/amqp_synch.rb

Instance Method Summary collapse

Methods included from Fanout

#fanout_name_for

Methods inherited from BaseExchange

#delete, #initialize, #unsubscribe

Constructor Details

This class inherits a constructor from RosettaQueue::Gateway::SynchExchange::BaseExchange

Instance Method Details

#publish(destination, message, options = {}) ⇒ Object



68
69
70
71
72
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 68

def publish(destination, message, options={})
  @queue = conn.exchange(fanout_name_for(destination), options.merge({:type => :fanout}))
  @queue.publish(message, options)
  RosettaQueue.logger.info("Publishing to fanout #{destination} :: #{message}")
end

#receive(destination, message_handler) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 74

def receive(destination, message_handler)
  @queue = conn.queue("queue_#{self.object_id}", @options)
  exchange = conn.exchange(fanout_name_for(destination), @options.merge({:type => :fanout}))
  @queue.bind(exchange)
  @queue.subscribe(@options) do |msg|
    RosettaQueue.logger.info("Receiving from #{destination} :: #{msg[:payload]}")
    message_handler.handle_message(msg[:payload])
  end
end

#receive_once(destination, options = {}) {|Filters.process_receiving(msg)| ... } ⇒ Object

Yields:



84
85
86
87
88
89
90
91
92
93
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 84

def receive_once(destination, options={})
  ack = options[:ack]
  @queue = conn.queue("queue_#{self.object_id}", options)
  exchange = conn.exchange(fanout_name_for(destination), options.merge({:type => :fanout}))
  @queue.bind(exchange)
  msg = @queue.pop[:payload]
  RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
  @queue.ack if ack
  yield Filters.process_receiving(msg)
end