Class: RosettaQueue::Gateway::Amqp

Inherits:
BaseAdapter show all
Defined in:
lib/rosetta_queue/adapters/amqp.rb

Direct Known Subclasses

AmqpEventedAdapter, AmqpSynchAdapter

Instance Method Summary collapse

Constructor Details

#initialize(adapter_settings = {}) ⇒ Amqp

Returns a new instance of Amqp.

Raises:



14
15
16
17
# File 'lib/rosetta_queue/adapters/amqp.rb', line 14

def initialize(adapter_settings = {})
  raise AdapterException, "Missing adapter settings" if adapter_settings.empty?
  @adapter_settings = adapter_settings
end

Instance Method Details

#delete(destination, opts = {}) ⇒ Object



19
20
21
# File 'lib/rosetta_queue/adapters/amqp.rb', line 19

def delete(destination, opts={})
  exchange_strategy_for(destination, opts).delete(destination)
end

#disconnectObject



23
24
25
# File 'lib/rosetta_queue/adapters/amqp.rb', line 23

def disconnect
  @exchange_strategy.unsubscribe if @exchange_strategy
end

#receive_once(destination, opts = {}) ⇒ Object



27
28
29
30
31
32
# File 'lib/rosetta_queue/adapters/amqp.rb', line 27

def receive_once(destination, opts={})
  @exchange_strategy = exchange_strategy_for(destination, opts)
  @exchange_strategy.receive_once(destination) do |msg|
    return msg
  end
end

#receive_with(message_handler) ⇒ Object



34
35
36
37
38
39
# File 'lib/rosetta_queue/adapters/amqp.rb', line 34

def receive_with(message_handler)
  options = options_for(message_handler)
  destination = destination_for(message_handler)
  @exchange_strategy = exchange_strategy_for(destination, options)
  @exchange_strategy.receive(destination, message_handler)
end

#send_message(destination, message, options = nil) ⇒ Object



41
42
43
44
# File 'lib/rosetta_queue/adapters/amqp.rb', line 41

def send_message(destination, message, options=nil)
  @exchange_strategy = exchange_strategy_for(destination, options)
  @exchange_strategy.publish(destination, message)
end

#unsubscribeObject



46
# File 'lib/rosetta_queue/adapters/amqp.rb', line 46

def unsubscribe; end