Class: RosettaQueue::Gateway::SynchExchange::DirectExchange
Instance Method Summary
collapse
#delete, #initialize, #unsubscribe
Instance Method Details
#publish(destination, message, options = {}) ⇒ Object
39
40
41
42
43
44
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 39
def publish(destination, message, options={})
RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")
@queue = conn.queue(destination, options)
exchange = conn.exchange("")
exchange.publish(message, {:key => destination}.merge(options))
end
|
#receive(destination, message_handler) ⇒ Object
46
47
48
49
50
51
52
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 46
def receive(destination, message_handler)
@queue = conn.queue(destination, @options)
@queue.subscribe(@options) do |msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg[:payload]}")
message_handler.handle_message(msg[:payload])
end
end
|
#receive_once(destination, options = {}) {|Filters.process_receiving(msg)| ... } ⇒ Object
54
55
56
57
58
59
60
61
|
# File 'lib/rosetta_queue/adapters/amqp_synch.rb', line 54
def receive_once(destination, options = {})
ack = options[:ack]
@queue = conn.queue(destination, options)
msg = @queue.pop[:payload]
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
@queue.ack if ack
yield Filters.process_receiving(msg)
end
|