Class: RosettaQueue::Gateway::EventedExchange::DirectExchange
- Inherits:
-
BaseExchange
- Object
- BaseExchange
- RosettaQueue::Gateway::EventedExchange::DirectExchange
show all
- Defined in:
- lib/rosetta_queue/adapters/amqp_evented.rb
Instance Method Summary
collapse
#delete, #initialize, #unsubscribe
Instance Method Details
#publish(destination, message, options = {}) ⇒ Object
42
43
44
45
46
47
48
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 42
def publish(destination, message, options={})
raise AdapterException, "Messages need to be published in an EventMachine run block (e.g., EM.run { RosettaQueue::Producer.publish(:foo, msg) } " unless EM.reactor_running?
@queue = channel.queue(destination, options)
@queue.publish(message, options)
RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")
end
|
#receive(destination, message_handler) ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 50
def receive(destination, message_handler)
raise AdapterException, "Consumers need to run in an EventMachine 'run' block. Try wrapping them inside the evented consumer manager." unless EM.reactor_running?
@queue = channel.queue(destination, @options)
ack = @options[:ack]
@queue.subscribe(@options) do |, msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
message_handler.handle_message(msg)
.ack if ack
end
end
|
#receive_once(destination, options = {}) ⇒ Object
62
63
64
65
66
67
68
69
70
71
72
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 62
def receive_once(destination, options={})
raise AdapterException, "Consumers need to run in an EventMachine 'run' block. (e.g., EM.run { RosettaQueue::Consumer.receive }" unless EM.reactor_running?
@queue = channel.queue(destination, @options)
ack = @options[:ack]
@queue.pop(@options) do |, msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
.ack if ack
yield Filters.process_receiving(msg)
end
end
|