Class: RosettaQueue::Gateway::BeanstalkAdapter
- Inherits:
-
BaseAdapter
- Object
- BaseAdapter
- RosettaQueue::Gateway::BeanstalkAdapter
- Defined in:
- lib/rosetta_queue/adapters/beanstalk.rb
Instance Method Summary collapse
- #ack(msg) ⇒ Object
- #disconnect ⇒ Object
-
#initialize(adapter_settings = {}) ⇒ BeanstalkAdapter
constructor
A new instance of BeanstalkAdapter.
-
#receive(options = nil) ⇒ Object
TODO: support options ?.
- #receive_once(destination = nil, opts = {}) ⇒ Object
- #receive_with(message_handler) ⇒ Object
- #send_message(destination, message, options) ⇒ Object
Constructor Details
#initialize(adapter_settings = {}) ⇒ BeanstalkAdapter
Returns a new instance of BeanstalkAdapter.
12 13 14 15 |
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 12 def initialize(adapter_settings = {}) @host, @port = adapter_settings[:host], adapter_settings[:port] @conn = Beanstalk::Pool.new(["#{@host}:#{@port}"]) end |
Instance Method Details
#ack(msg) ⇒ Object
8 9 10 |
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 8 def ack(msg) @conn.ack(msg.headers["message-id"]) end |
#disconnect ⇒ Object
17 |
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 17 def disconnect; end |
#receive(options = nil) ⇒ Object
TODO: support options ?
20 21 22 23 24 |
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 20 def receive(=nil) msg = @conn.reserve msg.delete msg end |
#receive_once(destination = nil, opts = {}) ⇒ Object
26 27 28 |
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 26 def receive_once(destination=nil, opts={}) receive.body end |
#receive_with(message_handler) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 30 def receive_with() # Note that, while we call destination_for (to comply with # Rosetta's generic specs), beanstalk doesn't actually support # destinations. This is just for compatibility. destination = destination_for() running do msg = receive.body RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") .(msg) end end |
#send_message(destination, message, options) ⇒ Object
43 44 45 46 |
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 43 def (destination, , ) RosettaQueue.logger.info("Publishing to #{destination} :: #{}") @conn.put() end |