Class: RosettaQueue::Gateway::EventedExchange::FanoutExchange
Instance Method Summary
collapse
Methods included from Fanout
#fanout_name_for
#delete, #initialize, #unsubscribe
Instance Method Details
#publish(dest, msg, opts) ⇒ Object
79
80
81
82
83
84
85
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 79
def publish(dest, msg, opts)
raise AdapterException, "Messages need to be published in an EventMachine run block (e.g., EM.run { RosettaQueue::Producer.publish(:foo, msg) } " unless EM.reactor_running?
@queue = channel.fanout(fanout_name_for(dest), opts)
@queue.publish(msg, opts)
RosettaQueue.logger.info("Publishing to fanout #{dest} :: #{msg}")
end
|
#receive(destination, message_handler) ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
98
99
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 87
def receive(destination, message_handler)
raise AdapterException, "Consumers need to run in an EventMachine 'run' block. Try wrapping them inside the evented consumer manager." unless EM.reactor_running?
@queue = channel.queue("queue_#{self.object_id}")
exchange = channel.fanout(fanout_name_for(destination), @options)
ack = @options[:ack]
@queue.bind(exchange).subscribe(@options) do |, msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
message_handler.handle_message(msg)
.ack if ack
end
end
|
#receive_once(destination, opts = {}) ⇒ Object
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 101
def receive_once(destination, opts={})
raise AdapterException, "Consumers need to run in an EventMachine 'run' block. (e.g., EM.run { RosettaQueue::Consumer.receive }" unless EM.reactor_running?
@queue = channel.queue("queue_#{self.object_id}")
exchange = channel.fanout(fanout_name_for(destination), opts)
ack = @options[:ack]
@queue.bind(exchange).pop(opts) do |, msg|
RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
.ack if ack
yield Filters.process_receiving(msg)
end
end
|