Class: AWS::SQS::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/sqs/queue.rb

Overview

Represents an Amazon SQS Queue.

Examples:

Sending a message


msg = queue.send_message("HELLO")
puts "Sent message: #{msg.id}"

Polling for messages indefinitely


queue.poll do |msg|
  puts "Got message: #{msg.body}"
end

Defined Under Namespace

Classes: SentMessage

Constant Summary collapse

DEFAULT_POLL_INTERVAL =
Deprecated.

No longer used by #poll

The default number of seconds to wait between polling requests for new messages.

1
DEFAULT_WAIT_TIME_SECONDS =

The default number of seconds to pass in as the SQS long polling value (:wait_time_seconds) in #receive_message.

Since:

  • 1.8.0

15

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#urlString (readonly)

Returns The queue URL.

Returns:

  • (String)

    The queue URL.



46
47
48
# File 'lib/aws/sqs/queue.rb', line 46

def url
  @url
end

Instance Method Details

#==(other) ⇒ Boolean Also known as: eql?

Returns true if the other queue has the same url.

Returns:

  • (Boolean)

    Returns true if the other queue has the same url.



655
656
657
# File 'lib/aws/sqs/queue.rb', line 655

def ==(other)
  other.kind_of?(Queue) and other.url == url
end

#approximate_number_of_messagesInteger Also known as: visible_messages

Returns The approximate number of visible messages in a queue. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.

Returns:



300
301
302
# File 'lib/aws/sqs/queue.rb', line 300

def approximate_number_of_messages
  get_attribute("ApproximateNumberOfMessages").to_i
end

#approximate_number_of_messages_delayedInteger

Returns an approximate count of messages delayed.

Returns:

  • (Integer)

    Returns an approximate count of messages delayed.



409
410
411
# File 'lib/aws/sqs/queue.rb', line 409

def approximate_number_of_messages_delayed
  get_attribute("ApproximateNumberOfMessagesDelayed").to_i
end

#approximate_number_of_messages_not_visibleInteger Also known as: invisible_messages

Returns The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.

Returns:

  • (Integer)

    The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.



309
310
311
# File 'lib/aws/sqs/queue.rb', line 309

def approximate_number_of_messages_not_visible
  get_attribute("ApproximateNumberOfMessagesNotVisible").to_i
end

#arnString

Returns The queue's Amazon resource name (ARN).

Returns:

  • (String)

    The queue's Amazon resource name (ARN).



414
415
416
# File 'lib/aws/sqs/queue.rb', line 414

def arn
  @arn ||= get_attribute("QueueArn")
end

#batch_change_visibility(visibility_timeout, *messages) ⇒ nil #batch_change_visibility(*messages_with_timeouts) ⇒ nil

Overloads:

  • #batch_change_visibility(visibility_timeout, *messages) ⇒ nil

    Accepts a single :visibility_timeout value and a list of messages (ReceivedMessage objects or receipt handle strings). This form of the method is useful when you want to set the same timeout value for each message.

    queue.batch_change_visibility(10, messages)

    Parameters:

    • visibility_timeout (Integer)

      The new value for the message's visibility timeout (in seconds).

    • message (ReceivedMessage, String)

      A list of up to 10 messages to change the visibility timeout for.

    Returns:

    • (nil)

    Raises:

    • (BatchChangeVisibilityError)

      Raises this error when one or more of the messages failed the visibility update.

  • #batch_change_visibility(*messages_with_timeouts) ⇒ nil

    Accepts a list of hashes. Each hash should provide the visibility timeout and message (a ReceivedMessage object or the recipt handle string).

    Use this form when each message needs a different visiblity timeout.

    messages = [] messages << { :message => 'handle1', :visibility_timeout => 5 } messages << { :message => 'handle2', :visibility_timeout => 10 }

    queue.batch_change_visibility(*messages)

    Parameters:

    • message (Hash)

      A list hashes, each with a :visibility_timeout and a :message.

    Returns:

    • (nil)

    Raises:

    • (BatchChangeVisibilityError)

      Raises this error when one or more of the messages failed the visibility update.

Raises:



620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
# File 'lib/aws/sqs/queue.rb', line 620

def batch_change_visibility *args

  args = args.flatten

  if args.first.is_a?(Integer)
    timeout = args.shift
    messages = args.collect{|m| [m, timeout] }
  else
    messages = args.collect{|m| [m[:message], m[:visibility_timeout]] }
  end

  entries = []
  messages.each do |msg,timeout|
    handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg
    entries << {
      :id => entries.size.to_s,
      :receipt_handle => handle,
      :visibility_timeout => timeout,
    }
  end

  response = client.change_message_visibility_batch(
    :queue_url => url, :entries => entries)

  failures = batch_failures(entries, response)

  raise Errors::BatchChangeVisibilityError.new(failures) unless
    failures.empty?

  nil

end

#batch_delete(*messages) ⇒ nil

Parameters:

  • messages (ReceivedMessage, String)

    A list of up to 10 messages to delete. Each message should be a ReceivedMessage object or a received message handle (string).

Returns:

  • (nil)

Raises:

  • (Errors::BatchDeleteSend)

    Raised when one or more of the messages failed to delete. The raised error has a list of the failures.



559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# File 'lib/aws/sqs/queue.rb', line 559

def batch_delete *messages

  entries = []
  messages.flatten.each_with_index do |msg,n|
    handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg
    entries << { :id => n.to_s, :receipt_handle => handle }
  end

  response = client.delete_message_batch(
    :queue_url => url, :entries => entries)

  failures = batch_failures(entries, response)

  raise Errors::BatchDeleteError.new(failures) unless failures.empty?

  nil

end

#batch_send(*messages) ⇒ Array<SentMessage>

Sends a batch of up to 10 messages in a single request.

queue.send_messages('message-1', 'message-2')

You can also set an optional delay for all of the messages:

# delay all messages 1 hour
queue.batch_send(msg1, msg2, :delay_seconds => 3600)

If you need to set a custom delay for each message you can pass hashes:

messages = []
messages << { :message_body => 'msg1', :delay_seconds => 60 }
messages << { :message_body => 'msg2', :delay_seconds => 30 }

queue.batch_send(messages)

Parameters:

  • messages (String, Hash)

    A list of messages. Each message should be a string, or a hash with a :message_body, and optionally :delay_seconds.

Returns:

  • (Array<SentMessage>)

    Returns an array of sent message objects. Each object responds to #message_id and #md5_of_message_body. The message id is generated by Amazon SQS.

Raises:

  • (Errors::BatchSendError)

    Raises this error when one or more of the messages failed to send, but others did. On the raised object you can access a list of the messages that failed, and a list of messages that succeeded.



514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/aws/sqs/queue.rb', line 514

def batch_send *messages

  entries = messages.flatten

  unless entries.first.is_a?(Hash)
    options = entries.last.is_a?(Hash) ? entries.pop : {}
    entries = entries.collect{|msg| { :message_body => msg } }
    if delay = options[:delay_seconds]
      entries.each {|entry| entry[:delay_seconds] = delay }
    end
  end

  entries.each_with_index {|entry,n| entry[:id] = n.to_s }

  client_opts = {}
  client_opts[:queue_url] = url
  client_opts[:entries] = entries

  response = client.send_message_batch(client_opts)

  failed = batch_failures(entries, response)

  sent = response[:successful].collect do |sent|
    msg = SentMessage.new
    msg.message_id = sent[:message_id]
    msg.md5 = sent[:md5_of_message_body]
    msg
  end

  raise Errors::BatchSendError.new(sent, failed) unless failed.empty?

  sent

end

#created_timestampTime

Returns The time when the queue was created.

Returns:

  • (Time)

    The time when the queue was created.



337
338
339
# File 'lib/aws/sqs/queue.rb', line 337

def created_timestamp
  Time.at(get_attribute("CreatedTimestamp").to_i)
end

#delay_secondsInteger

Returns Gets the current default delay for messages sent to the queue.

Returns:

  • (Integer)

    Gets the current default delay for messages sent to the queue.



383
384
385
# File 'lib/aws/sqs/queue.rb', line 383

def delay_seconds
  get_attribute("DelaySeconds").to_i
end

#delay_seconds=(seconds) ⇒ Object

Sets the default delay for messages sent to the queue.

Parameters:

  • seconds (Integer)

    How many seconds a message will be delayed.



389
390
391
# File 'lib/aws/sqs/queue.rb', line 389

def delay_seconds= seconds
  set_attribute("DelaySeconds", seconds.to_s)
end

#deletenil

Deletes the queue, regardless of whether it is empty.

When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue during the 60 seconds might succeed. For example, calling #send_message might succeed, but after the 60 seconds, the queue and that message you sent no longer exist.

Also, when you delete a queue, you must wait at least 60 seconds before creating a queue with the same name.

Returns:

  • (nil)


65
66
67
68
# File 'lib/aws/sqs/queue.rb', line 65

def delete
  client.delete_queue(:queue_url => url)
  nil
end

#exists?Boolean

Note:

This may raise an exception if you don't have permission to access the queue attributes. Also, it may return true for up to 60 seconds after a queue has been deleted.

Returns True if the queue exists.

Returns:

  • (Boolean)

    True if the queue exists.



424
425
426
427
428
429
430
431
# File 'lib/aws/sqs/queue.rb', line 424

def exists?
  client.get_queue_attributes(:queue_url => url,
                              :attribute_names => ["QueueArn"])
rescue Errors::NonExistentQueue, Errors::InvalidAddress
  false
else
  true
end

#last_modified_timestampTime

Returns The time when the queue was last changed.

Returns:

  • (Time)

    The time when the queue was last changed.



342
343
344
# File 'lib/aws/sqs/queue.rb', line 342

def last_modified_timestamp
  Time.at(get_attribute("LastModifiedTimestamp").to_i)
end

#maximum_message_sizeInteger

Returns The limit of how many bytes a message can contain before Amazon SQS rejects it.

Returns:

  • (Integer)

    The limit of how many bytes a message can contain before Amazon SQS rejects it.



348
349
350
# File 'lib/aws/sqs/queue.rb', line 348

def maximum_message_size
  get_attribute("MaximumMessageSize").to_i
end

#maximum_message_size=(size) ⇒ Object

Sets the maximum message size for the queue.

Parameters:

  • size (Integer)

    The limit of how many bytes a message can contain before Amazon SQS rejects it. This must be an integer from 1024 bytes (1KB) up to 65536 bytes (64KB). The default for this attribute is 8192 (8KB).

Returns:

  • Retuns the passed size argument.



359
360
361
# File 'lib/aws/sqs/queue.rb', line 359

def maximum_message_size=(size)
  set_attribute("MaximumMessageSize", size.to_s)
end

#message_retention_periodInteger

Returns The number of seconds Amazon SQS retains a message.

Returns:

  • (Integer)

    The number of seconds Amazon SQS retains a message.



365
366
367
# File 'lib/aws/sqs/queue.rb', line 365

def message_retention_period
  get_attribute("MessageRetentionPeriod").to_i
end

#message_retention_period=(period) ⇒ Object

Sets the message retention period for the queue

Parameters:

  • period (Integer)

    The number of seconds Amazon SQS retains a message. Must be an integer from 3600 (1 hour) to 1209600 (14 days). The default for this attribute is 345600 (4 days).

Returns:

  • Returns the passed period argument.



376
377
378
379
# File 'lib/aws/sqs/queue.rb', line 376

def message_retention_period=(period)
  set_attribute("MessageRetentionPeriod", period.to_s)
  period
end

#policyPolicy

Returns the current queue policy if there is one. Returns nil otherwise.

Returns:

  • (Policy)

    Returns the current queue policy if there is one. Returns nil otherwise.



451
452
453
454
455
456
457
458
459
460
# File 'lib/aws/sqs/queue.rb', line 451

def policy
  if policy_json = get_attribute('Policy')
    policy = SQS::Policy.from_json(policy_json)
    policy.extend(PolicyProxy)
    policy.queue = self
    policy
  else
    nil
  end
end

#policy=(policy) ⇒ nil

Set the policy on this queue.

If you pass nil or an empty string then it will have the same effect as deleting the policy.

Parameters:

  • policy

    The policy to set. This policy can be a Policy object, a json policy string, or any other object that responds with a policy string when it received #to_json.

Returns:

  • (nil)


473
474
475
476
477
478
479
480
481
# File 'lib/aws/sqs/queue.rb', line 473

def policy= policy
  policy_string = case policy
  when nil, '' then ''
  when String  then policy
  else policy.to_json
  end
  set_attribute('Policy', policy_string)
  nil
end

#poll(opts = {}) {|message| ... } ⇒ nil

Polls continually for messages. For example, you can use this to poll indefinitely:

queue.poll { |msg| puts msg.body }

Or, to poll indefinitely for the first message and then continue polling until no message is received for a period of at least ten seconds:

queue.poll(:initial_timeout => false,
           :idle_timeout => 10) { |msg| puts msg.body }

As with the block form of #receive_message, this method automatically deletes the message then the block exits normally.

Parameters:

  • opts (Hash) (defaults to: {})

    Options for polling.

Options Hash (opts):

  • :wait_time_seconds (Integer)

    The number of seconds the service should wait for a response when requesting a new message. Defaults to DEFAULT_WAIT_TIME_SECONDS. Use nil to use the queue's global long polling wait time setting. See #wait_time_seconds to set the global long poll setting on the queue.

  • :idle_timeout (Integer)

    The maximum number of seconds to spend polling while no messages are being returned. By default this method polls indefinitely whether messages are received or not.

  • :initial_timeout (Integer)

    The maximum number of seconds to spend polling before the first message is received. This option defaults to the value of :idle_timeout. You can specify false to poll indefinitely for the first message when :idle_timeout is set.

  • :batch_size (Integer)

    The maximum number of messages to retrieve in a single request. By default messages are received one at a time. Valid values: integers from 1 to 10.

  • :visibility_timeout (Integer)

    The duration (in seconds) that the received messages are hidden from subsequent retrieve requests. Valid values: integer from 0 to 43200 (maximum 12 hours)

  • :attributes (Array<Symbol, String>)

    The attributes to populate in each received message. Valid values:

    • :all (to populate all attributes)
    • :sender_id
    • :sent_at
    • :receive_count
    • :first_received_at

    See ReceivedMessage for documentation on each attribute's meaning.

  • :poll_interval (Float, Integer)

    As of v1.7.2, this option is no longer used. See the :wait_time_seconds option for long polling instead.

Yield Parameters:

Returns:

  • (nil)


273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
# File 'lib/aws/sqs/queue.rb', line 273

def poll(opts = {}, &block)
  opts[:limit] = opts.delete(:batch_size) if
    opts.key?(:batch_size)

  opts[:wait_time_seconds] = DEFAULT_WAIT_TIME_SECONDS unless
    opts.has_key?(:wait_time_seconds)

  last_message_at = Time.now
  got_first = false
  loop do
    got_msg = false
    receive_messages(opts) do |message|
      got_msg = got_first = true
      last_message_at = Time.now
      yield(message)
    end
    unless got_msg
      return if hit_timeout?(got_first, last_message_at, opts)
    end
  end
  nil
end

#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage Also known as: receive_messages

Note:

Due to the distributed nature of the queue, a weighted random set of machines is sampled on a ReceiveMessage call. That means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per call to #receive_message. If the number of messages in the queue is extremely small, you might not receive any messages. To poll continually for messages, use the #poll method, which automatically retries the request after a configurable delay.

Retrieves one or more messages. When a block is given, each message is yielded to the block and then deleted as long as the block exits normally. When no block is given, you must delete the message yourself using ReceivedMessage#delete.

Parameters:

  • opts (Hash) (defaults to: {})

    Options for receiving messages.

Options Hash (opts):

  • :limit (Integer)

    The maximum number of messages to receive. By default this is 1, and the return value is a single message object. If this options is specified and is not 1, the return value is an array of message objects; however, the array may contain fewer objects than you requested. Valid values: integers from 1 to 10.

    Not necessarily all the messages in the queue are returned (for more information, see the preceding note about machine sampling).

  • :wait_time_seconds (Integer)

    The number of seconds the service should wait for a response when requesting a new message. Defaults to the #wait_time_seconds attribute defined on the queue. See #wait_time_seconds to set the global long poll setting on the queue.

  • :visibility_timeout (Integer)

    The duration (in seconds) that the received messages are hidden from subsequent retrieve requests. Valid values: integer from 0 to 43200 (maximum 12 hours)

  • :attributes (Array<Symbol, String>)

    The attributes to populate in each received message. Valid values:

    • :all (to populate all attributes)
    • :sender_id
    • :sent_at
    • :receive_count
    • :first_received_at

    See ReceivedMessage for documentation on each attribute's meaning.

Yield Parameters:

Returns:

  • (ReceivedMessage)

    Returns the received message (or messages) only if a block is not given to this method.



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/aws/sqs/queue.rb', line 187

def receive_message(opts = {}, &block)
  resp = client.receive_message(receive_opts(opts))

  messages = resp[:messages].map do |m|
    ReceivedMessage.new(self, m[:message_id], m[:receipt_handle],
      :body => m[:body],
      :md5 => m[:md5_of_body],
      :attributes => m[:attributes])
  end

  if block
    call_message_block(messages, block)
  elsif opts[:limit] && opts[:limit] != 1
    messages
  else
    messages.first
  end
end

#send_message(body, options = {}) ⇒ SentMessage

Delivers a message to this queue.

Parameters:

  • body (String)

    The message to send. The maximum allowed message size is 64 KB. The message may only contain Unicode characters from the following list, according to the W3C XML specification (for more information, go to http://www.w3.org/TR/REC-xml/#charsets). If you send any characters not included in the list, your request will be rejected.

    • #x9
    • #xA
    • #xD
    • #x20 to #xD7FF
    • #xE000 to #xFFFD
    • #x10000 to #x10FFFF
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :delay_seconds (Integer)

    The number of seconds to delay the message. The message will become available for processing after the delay time has passed. If you don't specify a value, the default value for the queue applies. Should be from 0 to 900 (15 mins).

Returns:

  • (SentMessage)

    An object containing information about the message that was sent.



114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/aws/sqs/queue.rb', line 114

def send_message body, options = {}

  client_opts = options.dup
  client_opts[:queue_url] = url
  client_opts[:message_body] = body

  response = client.send_message(client_opts)

  msg = SentMessage.new
  msg.message_id = response[:message_id]
  msg.md5 = response[:md5_of_message_body]
  msg

end

#visibility_timeoutInteger

Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.

Returns:

  • (Integer)

    Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.



318
319
320
# File 'lib/aws/sqs/queue.rb', line 318

def visibility_timeout
  get_attribute("VisibilityTimeout").to_i
end

#visibility_timeout=(timeout) ⇒ Object

Sets the visibility timeout for the queue.

Parameters:

  • timeout (Integer)

    The length of time (in seconds) that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Valid values: integers from 0 to 43200 (12 hours).

Returns:

  • Returns the value passed as a timeout.



331
332
333
334
# File 'lib/aws/sqs/queue.rb', line 331

def visibility_timeout=(timeout)
  set_attribute("VisibilityTimeout", timeout.to_s)
  timeout
end

#wait_time_secondsInteger

Returns Gets the number of seconds the service will wait for a response when requesting a new message.

Returns:

  • (Integer)

    Gets the number of seconds the service will wait for a response when requesting a new message

Since:

  • 1.8.0



396
397
398
# File 'lib/aws/sqs/queue.rb', line 396

def wait_time_seconds
  get_attribute("ReceiveMessageWaitTimeSeconds").to_i
end

#wait_time_seconds=(seconds) ⇒ Object

Sets the number of seconds that the service should wait for a response when requesting a new message

Parameters:

  • seconds (Integer)

    How many seconds to wait for a response

Since:

  • 1.8.0



404
405
406
# File 'lib/aws/sqs/queue.rb', line 404

def wait_time_seconds= seconds
  set_attribute("ReceiveMessageWaitTimeSeconds", seconds.to_s)
end