Class: Laboristo::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/laboristo.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url) ⇒ Queue

Returns a new instance of Queue.



9
10
11
12
# File 'lib/laboristo.rb', line 9

def initialize(url)
  @sqs = Aws::SQS::Client.new
  @url = url
end

Instance Attribute Details

#sqsObject

Returns the value of attribute sqs.



7
8
9
# File 'lib/laboristo.rb', line 7

def sqs
  @sqs
end

#urlObject

Returns the value of attribute url.



6
7
8
# File 'lib/laboristo.rb', line 6

def url
  @url
end

Instance Method Details

#each(&block) ⇒ Object Also known as: pop



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/laboristo.rb', line 19

def each(&block)
  loop do
    resp = @sqs.receive_message(queue_url: @url,
                                attribute_names: ['All'],
                                max_number_of_messages: 10).data.to_hash

    resp[:messages] && resp[:messages].each do |msg|
      begin
        block.call(Base64.decode64 msg[:body])
        @sqs.delete_message(queue_url: @url,
                            receipt_handle: msg[:receipt_handle])
      rescue StandardError => e
        $stdout.puts "ERROR: Can't process message #{msg[:message_id]}.\n#{e}"
      end
    end
  end
end

#purgeObject



40
41
42
# File 'lib/laboristo.rb', line 40

def purge
  @sqs.purge_queue(queue_url: @url)
end

#push(message) ⇒ Object Also known as: <<



14
15
16
17
# File 'lib/laboristo.rb', line 14

def push(message)
  encoded = Base64.encode64(message)
  @sqs.send_message(queue_url: @url, message_body: encoded)
end