Class: MessageQueue::Rabbit

Inherits:
Base
  • Object
show all
Defined in:
lib/message_queue/rabbit.rb

Instance Attribute Summary

Attributes inherited from Base

#opts

Instance Method Summary collapse

Methods inherited from Base

#subscribe, #subscribe?

Constructor Details

#initialize(opts = {}) ⇒ Rabbit

Returns a new instance of Rabbit.



5
6
7
# File 'lib/message_queue/rabbit.rb', line 5

def initialize(opts={})
  @opts = opts
end

Instance Method Details

#clientObject



57
58
59
60
61
62
63
64
65
66
# File 'lib/message_queue/rabbit.rb', line 57

def client
  @client ||= Carrot.new(
    :host   => @opts['host'], 
    :port   => @opts['port'].to_i, 
    :user   => @opts['user'], 
    :pass   => @opts['pass'], 
    :vhost  => @opts['vhost'],
    :insist => @opts['insist']
  ) 
end

#confirm(queue) ⇒ Object



35
36
37
38
39
# File 'lib/message_queue/rabbit.rb', line 35

def confirm(queue)
  send_command do
    client.queue(queue).ack
  end
end

#delete(queue) ⇒ Object



9
10
11
12
13
# File 'lib/message_queue/rabbit.rb', line 9

def delete(queue)
  send_command do
    client.queue(queue).delete
  end
end

#dequeue(queue) ⇒ Object



27
28
29
30
31
32
33
# File 'lib/message_queue/rabbit.rb', line 27

def dequeue(queue)
  send_command do
    task = client.queue(queue).pop(:ack => true)
    return unless task
    Marshal.load(task)
  end
end

#enqueue(queue, data) ⇒ Object



21
22
23
24
25
# File 'lib/message_queue/rabbit.rb', line 21

def enqueue(queue, data)
  send_command do 
    client.queue(queue, :durable => true).publish(Marshal.dump(data), :persistent => true)
  end
end

#queue_size(queue) ⇒ Object



15
16
17
18
19
# File 'lib/message_queue/rabbit.rb', line 15

def queue_size(queue)
  send_command do
    client.queue(queue).message_count
  end
end

#send_command(&block) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/message_queue/rabbit.rb', line 41

def send_command(&block)
  retried = false
  begin
    block.call
  rescue Carrot::AMQP::Server::ServerDown => e
    if not retried
      puts "Error #{e.message}. Retrying..."
      @client = nil
      retried = true
      retry
    else
      raise e
    end
  end
end

#stopObject



68
69
70
# File 'lib/message_queue/rabbit.rb', line 68

def stop
  client.stop
end