Class: Tootsie::SqsQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/tootsie/queues/sqs_queue.rb

Overview

A queue which uses Amazon’s Simple Queue Service (SQS).

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ SqsQueue

Returns a new instance of SqsQueue.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/tootsie/queues/sqs_queue.rb', line 12

def initialize(options)
  options.assert_valid_keys(:access_key_id, :secret_access_key, :queue_name, :max_backoff)
  @backoff = Utility::Backoff.new(:max => options[:max_backoff])
  @sqs_service = ::Sqs::Service.new(
    :access_key_id => options[:access_key_id],
    :secret_access_key => options[:secret_access_key])
  @logger = Application.get.logger
  @queue_name = options[:queue_name] || 'tootsie'
  @queue = @sqs_service.queues.find_first(@queue_name)
  unless @queue
    @logger.warn "Queue #{@queue_name} does not exist, creating it"
    @sqs_service.queues.create(@queue_name)
    begin
      timeout(30) do
        while not @queue
          sleep(1)
          @queue = @sqs_service.queues.find_first(@queue_name)
        end
      end
    rescue Timeout::Error
      raise SqsQueueCouldNotFindQueueError
    end
  end
end

Instance Method Details

#countObject



37
38
39
# File 'lib/tootsie/queues/sqs_queue.rb', line 37

def count
  @queue.attributes['ApproximateNumberOfMessages'].to_i
end

#pop(options = {}) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/tootsie/queues/sqs_queue.rb', line 59

def pop(options = {})
  item = nil
  loop do
    @backoff.with do
      begin
        message = @queue.message(5)
      rescue Exception => exception
        check_exception(exception)
        @logger.error("Reading queue failed with exception #{exception.class}: #{exception.message}")
        break unless options[:wait]
        sleep(0.5)
        retry
      end
      if message
        begin
          item = JSON.parse(message.body)
        ensure
          # Always destroy, even if parsing fails
          message.destroy
        end
      end
      if item or not options[:wait]
        true
      else
        false
      end
    end
    break if item
  end
  item
end

#push(item) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/tootsie/queues/sqs_queue.rb', line 41

def push(item)
  retries_left = 5
  begin
    return @queue.create_message(item.to_json)
  rescue Exception => exception
    check_exception(exception)
    if retries_left > 0
      @logger.warn("Writing queue failed with exception (#{exception.message}), will retry")
      retries_left -= 1
      sleep(0.5)
      retry
    else
      @logger.error("Writing queue failed with exception #{exception.class}: #{exception.message}")
      raise exception
    end
  end
end