Class: RosettaQueue::Gateway::EventedExchange::DirectExchange

Inherits:
BaseExchange
  • Object
show all
Defined in:
lib/rosetta_queue/adapters/amqp_evented.rb

Instance Method Summary collapse

Methods inherited from BaseExchange

#delete, #initialize, #unsubscribe

Constructor Details

This class inherits a constructor from RosettaQueue::Gateway::EventedExchange::BaseExchange

Instance Method Details

#publish(destination, message, options = {}) ⇒ Object

Raises:



42
43
44
45
46
47
48
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 42

def publish(destination, message, options={})
  raise AdapterException, "Messages need to be published in an EventMachine run block (e.g., EM.run { RosettaQueue::Producer.publish(:foo, msg) } " unless EM.reactor_running?

  @queue = channel.queue(destination, options)
  @queue.publish(message, options)
  RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")
end

#receive(destination, message_handler) ⇒ Object

Raises:



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 50

def receive(destination, message_handler)
  raise AdapterException, "Consumers need to run in an EventMachine 'run' block.  Try wrapping them inside the evented consumer manager." unless EM.reactor_running?

  @queue = channel.queue(destination, @options)
  ack = @options[:ack]
  @queue.subscribe(@options) do |header, msg|
    RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
    message_handler.handle_message(msg)
    header.ack if ack
  end
end

#receive_once(destination, options = {}) ⇒ Object

Raises:



62
63
64
65
66
67
68
69
70
71
72
# File 'lib/rosetta_queue/adapters/amqp_evented.rb', line 62

def receive_once(destination, options={})
  raise AdapterException, "Consumers need to run in an EventMachine 'run' block. (e.g., EM.run { RosettaQueue::Consumer.receive }" unless EM.reactor_running?

  @queue = channel.queue(destination, @options)
  ack = @options[:ack]
  @queue.pop(@options) do |header, msg|
    RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
    header.ack if ack
    yield Filters.process_receiving(msg)
  end
end