Class: MessageQueue::Rabbit

Inherits:
Base
  • Object
show all
Defined in:
lib/message_queue/rabbit.rb

Instance Attribute Summary

Attributes inherited from Base

#opts

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Rabbit

Returns a new instance of Rabbit.



5
6
7
# File 'lib/message_queue/rabbit.rb', line 5

def initialize(opts={})
  @opts = opts
end

Instance Method Details

#clientObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/message_queue/rabbit.rb', line 51

def client
  return @client if @client

  if @opts['cluster']
    @opts['cluster'].each_with_index do |server, i|
      host, port = server.split(':')
      begin
        @client = Carrot.new(
          :host   => host,
          :port   => port.to_i,
          :user   => @opts['user'],
          :pass   => @opts['pass'],
          :vhost  => @opts['vhost'],
          :insist => @opts['insist']
        )
        return @client
      rescue Carrot::AMQP::Server::ServerDown => e
        if i == (@opts['cluster'].size-1)
          raise e
        else
          Sweatshop.log "\n*** Sweatshop failing over to #{@opts['cluster'][i+1]} ***"
          Sweatshop.log "Error: #{e.message}\n#{e.backtrace.join("\n")}"
          next
        end
      end
    end
  else
    if @opts['host'] =~ /:/
      host, port = @opts['host'].split(':')
    else
      host = @opts['host']
      port = @opts['port']
    end
    @client = Carrot.new(
      :host   => host,
      :port   => port.to_i,
      :user   => @opts['user'],
      :pass   => @opts['pass'],
      :vhost  => @opts['vhost'],
      :insist => @opts['insist']
    )
  end
  @client
end

#client=(client) ⇒ Object



96
97
98
# File 'lib/message_queue/rabbit.rb', line 96

def client=(client)
  @client = client
end

#cmd(queue, command, *args) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/message_queue/rabbit.rb', line 35

def cmd(queue, command, *args)
  retried = false
  begin
    client.queue(queue, :durable => true).send(command, *args)
  rescue Carrot::AMQP::Server::ServerDown => e
    if not retried
      Sweatshop.log "Error #{e.message}. Retrying..."
      @client = nil
      retried = true
      retry
    else
      raise e
    end
  end
end

#confirm(queue) ⇒ Object



27
28
29
# File 'lib/message_queue/rabbit.rb', line 27

def confirm(queue)
  cmd(queue, :ack)
end

#delete(queue) ⇒ Object



9
10
11
# File 'lib/message_queue/rabbit.rb', line 9

def delete(queue)
  cmd(queue, :delete)
end

#dequeue(queue) ⇒ Object



21
22
23
24
25
# File 'lib/message_queue/rabbit.rb', line 21

def dequeue(queue)
  task = cmd(queue, :pop, :ack => true)
  return unless task
  Marshal.load(task)
end

#enqueue(queue, data) ⇒ Object



17
18
19
# File 'lib/message_queue/rabbit.rb', line 17

def enqueue(queue, data)
  cmd(queue, :publish, Marshal.dump(data), :persistent => true)
end

#flush_all(queue) ⇒ Object



31
32
33
# File 'lib/message_queue/rabbit.rb', line 31

def flush_all(queue)
  cmd(queue, :purge)
end

#queue_size(queue) ⇒ Object



13
14
15
# File 'lib/message_queue/rabbit.rb', line 13

def queue_size(queue)
  cmd(queue, :message_count)
end

#stopObject



100
101
102
# File 'lib/message_queue/rabbit.rb', line 100

def stop
  client.stop
end