Class: RosettaQueue::Gateway::Amqp
Instance Method Summary
collapse
Constructor Details
#initialize(adapter_settings = {}) ⇒ Amqp
Returns a new instance of Amqp.
14
15
16
17
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 14
def initialize(adapter_settings = {})
raise AdapterException, "Missing adapter settings" if adapter_settings.empty?
@adapter_settings = adapter_settings
end
|
Instance Method Details
#delete(destination, opts = {}) ⇒ Object
19
20
21
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 19
def delete(destination, opts={})
exchange_strategy_for(destination, opts).delete(destination)
end
|
#disconnect ⇒ Object
23
24
25
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 23
def disconnect
@exchange_strategy.unsubscribe if @exchange_strategy
end
|
#receive_once(destination, opts = {}) ⇒ Object
27
28
29
30
31
32
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 27
def receive_once(destination, opts={})
@exchange_strategy = exchange_strategy_for(destination, opts)
@exchange_strategy.receive_once(destination) do |msg|
return msg
end
end
|
#receive_with(message_handler) ⇒ Object
34
35
36
37
38
39
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 34
def receive_with(message_handler)
options = options_for(message_handler)
destination = destination_for(message_handler)
@exchange_strategy = exchange_strategy_for(destination, options)
@exchange_strategy.receive(destination, message_handler)
end
|
#send_message(destination, message, options = nil) ⇒ Object
41
42
43
44
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 41
def send_message(destination, message, options=nil)
@exchange_strategy = exchange_strategy_for(destination, options)
@exchange_strategy.publish(destination, message)
end
|
#unsubscribe ⇒ Object
46
|
# File 'lib/rosetta_queue/adapters/amqp.rb', line 46
def unsubscribe; end
|