Class: Tootsie::AmqpQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/tootsie/queues/amqp_queue.rb

Overview

A queue which uses the AMQP protocol.

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ AmqpQueue

Returns a new instance of AmqpQueue.



6
7
8
9
10
11
12
# File 'lib/tootsie/queues/amqp_queue.rb', line 6

def initialize(options = {})
  options.assert_valid_keys(:host_name, :queue_name, :max_backoff)
  @backoff = Utility::Backoff.new(:max => options[:max_backoff])
  @logger = Application.get.logger
  @host_name = options[:host_name] || 'localhost'
  @queue_name = options[:queue_name] || 'tootsie'
end

Instance Method Details

#countObject



14
15
16
# File 'lib/tootsie/queues/amqp_queue.rb', line 14

def count
  nil
end

#pop(options = {}) ⇒ Object



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/tootsie/queues/amqp_queue.rb', line 27

def pop(options = {})
  item = nil
  loop do
    @backoff.with do
      message = nil
      with_retry do
        with_connection do
          message = @queue.pop(:ack => true)
        end
      end
      if message
        data = message[:payload]
        data = nil if data == :queue_empty
        if data
          @logger.info "Popped: #{data.inspect}"
          item = JSON.parse(data)
          with_connection do
            @queue.ack(:delivery_tag => message[:delivery_details][:delivery_tag])
          end
        end
      end
      if item or not options[:wait]
        true
      else
        false
      end
    end
    break if item
  end
  item
end

#push(item) ⇒ Object



18
19
20
21
22
23
24
25
# File 'lib/tootsie/queues/amqp_queue.rb', line 18

def push(item)
  data = item.to_json
  with_retry do
    with_connection do
      @exchange.publish(data, :persistent => true, :key => @queue_name)
    end
  end
end