Class: MessageQueue::Rabbit
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
- #client ⇒ Object
- #confirm(queue) ⇒ Object
- #delete(queue) ⇒ Object
- #dequeue(queue) ⇒ Object
- #enqueue(queue, data) ⇒ Object
-
#initialize(opts = {}) ⇒ Rabbit
constructor
A new instance of Rabbit.
- #queue_size(queue) ⇒ Object
- #send_command(&block) ⇒ Object
- #stop ⇒ Object
Methods inherited from Base
Constructor Details
#initialize(opts = {}) ⇒ Rabbit
Returns a new instance of Rabbit.
5 6 7 |
# File 'lib/message_queue/rabbit.rb', line 5 def initialize(opts={}) @opts = opts end |
Instance Method Details
#client ⇒ Object
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/message_queue/rabbit.rb', line 57 def client @client ||= Carrot.new( :host => @opts['host'], :port => @opts['port'].to_i, :user => @opts['user'], :pass => @opts['pass'], :vhost => @opts['vhost'], :insist => @opts['insist'] ) end |
#confirm(queue) ⇒ Object
35 36 37 38 39 |
# File 'lib/message_queue/rabbit.rb', line 35 def confirm(queue) send_command do client.queue(queue).ack end end |
#delete(queue) ⇒ Object
9 10 11 12 13 |
# File 'lib/message_queue/rabbit.rb', line 9 def delete(queue) send_command do client.queue(queue).delete end end |
#dequeue(queue) ⇒ Object
27 28 29 30 31 32 33 |
# File 'lib/message_queue/rabbit.rb', line 27 def dequeue(queue) send_command do task = client.queue(queue).pop(:ack => true) return unless task Marshal.load(task) end end |
#enqueue(queue, data) ⇒ Object
21 22 23 24 25 |
# File 'lib/message_queue/rabbit.rb', line 21 def enqueue(queue, data) send_command do client.queue(queue, :durable => true).publish(Marshal.dump(data), :persistent => true) end end |
#queue_size(queue) ⇒ Object
15 16 17 18 19 |
# File 'lib/message_queue/rabbit.rb', line 15 def queue_size(queue) send_command do client.queue(queue). end end |
#send_command(&block) ⇒ Object
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/message_queue/rabbit.rb', line 41 def send_command(&block) retried = false begin block.call rescue Carrot::AMQP::Server::ServerDown => e if not retried puts "Error #{e.}. Retrying..." @client = nil retried = true retry else raise e end end end |
#stop ⇒ Object
68 69 70 |
# File 'lib/message_queue/rabbit.rb', line 68 def stop client.stop end |