Class: RosettaQueue::Gateway::BeanstalkAdapter

Inherits:
BaseAdapter
  • Object
show all
Defined in:
lib/rosetta_queue/adapters/beanstalk.rb

Instance Method Summary collapse

Constructor Details

#initialize(adapter_settings = {}) ⇒ BeanstalkAdapter

Returns a new instance of BeanstalkAdapter.



12
13
14
15
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 12

def initialize(adapter_settings = {})
  @host, @port = adapter_settings[:host], adapter_settings[:port]
  @conn = Beanstalk::Pool.new(["#{@host}:#{@port}"])
end

Instance Method Details

#ack(msg) ⇒ Object



8
9
10
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 8

def ack(msg)
  @conn.ack(msg.headers["message-id"])
end

#disconnectObject



17
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 17

def disconnect; end

#receive(options = nil) ⇒ Object

TODO: support options ?



20
21
22
23
24
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 20

def receive(options=nil)
  msg = @conn.reserve
  msg.delete
  msg
end

#receive_once(destination = nil, opts = {}) ⇒ Object



26
27
28
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 26

def receive_once(destination=nil, opts={})
  receive.body
end

#receive_with(message_handler) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 30

def receive_with(message_handler)
  # Note that, while we call destination_for (to comply with
  # Rosetta's generic specs), beanstalk doesn't actually support
  # destinations. This is just for compatibility.
  destination = destination_for(message_handler)

  running do
    msg = receive.body
    RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
    message_handler.handle_message(msg)
  end
end

#send_message(destination, message, options) ⇒ Object



43
44
45
46
# File 'lib/rosetta_queue/adapters/beanstalk.rb', line 43

def send_message(destination, message, options)
  RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")
  @conn.put(message)
end