Class: AWS::SQS::Queue
- Inherits:
-
Object
- Object
- AWS::SQS::Queue
- Defined in:
- lib/aws/sqs/queue.rb
Overview
Represents an Amazon SQS Queue.
Defined Under Namespace
Classes: SentMessage
Constant Summary collapse
- DEFAULT_POLL_INTERVAL =
Deprecated.
No longer used by #poll
The default number of seconds to wait between polling requests for new messages.
1
- DEFAULT_WAIT_TIME_SECONDS =
The default number of seconds to pass in as the SQS long polling value (
:wait_time_seconds
) in #receive_message. 15
Instance Attribute Summary collapse
-
#url ⇒ String
readonly
The queue URL.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
(also: #eql?)
Returns true if the other queue has the same url.
-
#approximate_number_of_messages ⇒ Integer
(also: #visible_messages)
The approximate number of visible messages in a queue.
-
#approximate_number_of_messages_delayed ⇒ Integer
Returns an approximate count of messages delayed.
-
#approximate_number_of_messages_not_visible ⇒ Integer
(also: #invisible_messages)
The approximate number of messages that are not timed-out and not deleted.
-
#arn ⇒ String
The queue’s Amazon resource name (ARN).
- #batch_change_visibility(*args) ⇒ Object
- #batch_delete(*messages) ⇒ nil
-
#batch_send(*messages) ⇒ Array<SentMessage>
Sends a batch of up to 10 messages in a single request.
-
#created_timestamp ⇒ Time
The time when the queue was created.
-
#delay_seconds ⇒ Integer
Gets the current default delay for messages sent to the queue.
-
#delay_seconds=(seconds) ⇒ Object
Sets the default delay for messages sent to the queue.
-
#delete ⇒ nil
Deletes the queue, regardless of whether it is empty.
-
#exists? ⇒ Boolean
True if the queue exists.
-
#last_modified_timestamp ⇒ Time
The time when the queue was last changed.
-
#maximum_message_size ⇒ Integer
The limit of how many bytes a message can contain before Amazon SQS rejects it.
-
#maximum_message_size=(size) ⇒ Object
Sets the maximum message size for the queue.
-
#message_retention_period ⇒ Integer
The number of seconds Amazon SQS retains a message.
-
#message_retention_period=(period) ⇒ Object
Sets the message retention period for the queue.
-
#policy ⇒ Policy
Returns the current queue policy if there is one.
-
#policy=(policy) ⇒ nil
Set the policy on this queue.
-
#poll(opts = {}) {|message| ... } ⇒ nil
Polls continually for messages.
-
#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage
(also: #receive_messages)
Retrieves one or more messages.
-
#send_message(body, options = {}) ⇒ SentMessage
Delivers a message to this queue.
-
#visibility_timeout ⇒ Integer
Returns the visibility timeout for the queue.
-
#visibility_timeout=(timeout) ⇒ Object
Sets the visibility timeout for the queue.
-
#wait_time_seconds ⇒ Integer
Gets the number of seconds the service will wait for a response when requesting a new message.
-
#wait_time_seconds=(seconds) ⇒ Object
Sets the number of seconds that the service should wait for a response when requesting a new message.
Instance Attribute Details
#url ⇒ String (readonly)
Returns The queue URL.
44 45 46 |
# File 'lib/aws/sqs/queue.rb', line 44 def url @url end |
Instance Method Details
#==(other) ⇒ Boolean Also known as: eql?
Returns true if the other queue has the same url.
656 657 658 |
# File 'lib/aws/sqs/queue.rb', line 656 def ==(other) other.kind_of?(Queue) and other.url == url end |
#approximate_number_of_messages ⇒ Integer Also known as: visible_messages
Returns The approximate number of visible messages in a queue. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.
299 300 301 |
# File 'lib/aws/sqs/queue.rb', line 299 def get_attribute("ApproximateNumberOfMessages").to_i end |
#approximate_number_of_messages_delayed ⇒ Integer
Returns an approximate count of messages delayed.
410 411 412 |
# File 'lib/aws/sqs/queue.rb', line 410 def get_attribute("ApproximateNumberOfMessagesDelayed").to_i end |
#approximate_number_of_messages_not_visible ⇒ Integer Also known as: invisible_messages
Returns The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.
309 310 311 |
# File 'lib/aws/sqs/queue.rb', line 309 def get_attribute("ApproximateNumberOfMessagesNotVisible").to_i end |
#arn ⇒ String
Returns The queue’s Amazon resource name (ARN).
415 416 417 |
# File 'lib/aws/sqs/queue.rb', line 415 def arn @arn ||= get_attribute("QueueArn") end |
#batch_change_visibility(visibility_timeout, *messages) ⇒ nil #batch_change_visibility(*messages_with_timeouts) ⇒ nil
621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 |
# File 'lib/aws/sqs/queue.rb', line 621 def batch_change_visibility *args args = args.flatten if args.first.is_a?(Integer) timeout = args.shift = args.collect{|m| [m, timeout] } else = args.collect{|m| [m[:message], m[:visibility_timeout]] } end entries = [] .each do |msg,timeout| handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg entries << { :id => entries.size.to_s, :receipt_handle => handle, :visibility_timeout => timeout, } end response = client.( :queue_url => url, :entries => entries) failures = batch_failures(entries, response) raise Errors::BatchChangeVisibilityError.new(failures) unless failures.empty? nil end |
#batch_delete(*messages) ⇒ nil
560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 |
# File 'lib/aws/sqs/queue.rb', line 560 def batch_delete * entries = [] .flatten.each_with_index do |msg,n| handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg entries << { :id => n.to_s, :receipt_handle => handle } end response = client.( :queue_url => url, :entries => entries) failures = batch_failures(entries, response) raise Errors::BatchDeleteError.new(failures) unless failures.empty? nil end |
#batch_send(*messages) ⇒ Array<SentMessage>
Sends a batch of up to 10 messages in a single request.
queue.('message-1', 'message-2')
You can also set an optional delay for all of the messages:
# delay all messages 1 hour
queue.batch_send(msg1, msg2, :delay_seconds => 3600)
If you need to set a custom delay for each message you can pass hashes:
= []
<< { :message_body => 'msg1', :delay_seconds => 60 }
<< { :message_body => 'msg2', :delay_seconds => 30 }
queue.batch_send()
515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 |
# File 'lib/aws/sqs/queue.rb', line 515 def batch_send * entries = .flatten unless entries.first.is_a?(Hash) = entries.last.is_a?(Hash) ? entries.pop : {} entries = entries.collect{|msg| { :message_body => msg } } if delay = [:delay_seconds] entries.each {|entry| entry[:delay_seconds] = delay } end end entries.each_with_index {|entry,n| entry[:id] = n.to_s } client_opts = {} client_opts[:queue_url] = url client_opts[:entries] = entries response = client.(client_opts) failed = batch_failures(entries, response) sent = response[:successful].collect do |sent| msg = SentMessage.new msg. = sent[:message_id] msg.md5 = sent[:md5_of_message_body] msg end raise Errors::BatchSendError.new(sent, failed) unless failed.empty? sent end |
#created_timestamp ⇒ Time
Returns The time when the queue was created.
338 339 340 |
# File 'lib/aws/sqs/queue.rb', line 338 def Time.at(get_attribute("CreatedTimestamp").to_i) end |
#delay_seconds ⇒ Integer
Returns Gets the current default delay for messages sent to the queue.
384 385 386 |
# File 'lib/aws/sqs/queue.rb', line 384 def delay_seconds get_attribute("DelaySeconds").to_i end |
#delay_seconds=(seconds) ⇒ Object
Sets the default delay for messages sent to the queue.
390 391 392 |
# File 'lib/aws/sqs/queue.rb', line 390 def delay_seconds= seconds set_attribute("DelaySeconds", seconds.to_s) end |
#delete ⇒ nil
Deletes the queue, regardless of whether it is empty.
When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue during the 60 seconds might succeed. For example, calling #send_message might succeed, but after the 60 seconds, the queue and that message you sent no longer exist.
Also, when you delete a queue, you must wait at least 60 seconds before creating a queue with the same name.
63 64 65 66 |
# File 'lib/aws/sqs/queue.rb', line 63 def delete client.delete_queue(:queue_url => url) nil end |
#exists? ⇒ Boolean
This may raise an exception if you don’t have permission to access the queue attributes. Also, it may return true for up to 60 seconds after a queue has been deleted.
Returns True if the queue exists.
425 426 427 428 429 430 431 432 |
# File 'lib/aws/sqs/queue.rb', line 425 def exists? client.get_queue_attributes(:queue_url => url, :attribute_names => ["QueueArn"]) rescue Errors::NonExistentQueue, Errors::InvalidAddress false else true end |
#last_modified_timestamp ⇒ Time
Returns The time when the queue was last changed.
343 344 345 |
# File 'lib/aws/sqs/queue.rb', line 343 def Time.at(get_attribute("LastModifiedTimestamp").to_i) end |
#maximum_message_size ⇒ Integer
Returns The limit of how many bytes a message can contain before Amazon SQS rejects it.
349 350 351 |
# File 'lib/aws/sqs/queue.rb', line 349 def get_attribute("MaximumMessageSize").to_i end |
#maximum_message_size=(size) ⇒ Object
Sets the maximum message size for the queue.
360 361 362 |
# File 'lib/aws/sqs/queue.rb', line 360 def (size) set_attribute("MaximumMessageSize", size.to_s) end |
#message_retention_period ⇒ Integer
Returns The number of seconds Amazon SQS retains a message.
366 367 368 |
# File 'lib/aws/sqs/queue.rb', line 366 def get_attribute("MessageRetentionPeriod").to_i end |
#message_retention_period=(period) ⇒ Object
Sets the message retention period for the queue
377 378 379 380 |
# File 'lib/aws/sqs/queue.rb', line 377 def (period) set_attribute("MessageRetentionPeriod", period.to_s) period end |
#policy ⇒ Policy
Returns the current queue policy if there is one. Returns nil
otherwise.
452 453 454 455 456 457 458 459 460 461 |
# File 'lib/aws/sqs/queue.rb', line 452 def policy if policy_json = get_attribute('Policy') policy = SQS::Policy.from_json(policy_json) policy.extend(PolicyProxy) policy.queue = self policy else nil end end |
#policy=(policy) ⇒ nil
Set the policy on this queue.
If you pass nil or an empty string then it will have the same effect as deleting the policy.
474 475 476 477 478 479 480 481 482 |
# File 'lib/aws/sqs/queue.rb', line 474 def policy= policy policy_string = case policy when nil, '' then '' when String then policy else policy.to_json end set_attribute('Policy', policy_string) nil end |
#poll(opts = {}) {|message| ... } ⇒ nil
Polls continually for messages. For example, you can use this to poll indefinitely:
queue.poll { |msg| puts msg.body }
Or, to poll indefinitely for the first message and then continue polling until no message is received for a period of at least ten seconds:
queue.poll(:initial_timeout => false,
:idle_timeout => 10) { |msg| puts msg.body }
As with the block form of #receive_message, this method automatically deletes the message then the block exits normally.
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 |
# File 'lib/aws/sqs/queue.rb', line 271 def poll(opts = {}, &block) opts[:limit] = opts.delete(:batch_size) if opts.key?(:batch_size) opts[:wait_time_seconds] = DEFAULT_WAIT_TIME_SECONDS unless opts.has_key?(:wait_time_seconds) = Time.now got_first = false loop do got_msg = false (opts) do || got_msg = got_first = true = Time.now yield() end unless got_msg return if hit_timeout?(got_first, , opts) end end nil end |
#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage Also known as: receive_messages
Due to the distributed nature of the queue, a weighted random set of machines is sampled on a ReceiveMessage call. That means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per call to #receive_message. If the number of messages in the queue is extremely small, you might not receive any messages. To poll continually for messages, use the #poll method, which automatically retries the request after a configurable delay.
Retrieves one or more messages. When a block is given, each message is yielded to the block and then deleted as long as the block exits normally. When no block is given, you must delete the message yourself using ReceivedMessage#delete.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/aws/sqs/queue.rb', line 185 def (opts = {}, &block) resp = client.(receive_opts(opts)) = resp[:messages].map do |m| ReceivedMessage.new(self, m[:message_id], m[:receipt_handle], :body => m[:body], :md5 => m[:md5_of_body], :attributes => m[:attributes]) end if block (, block) elsif opts[:limit] && opts[:limit] != 1 else .first end end |
#send_message(body, options = {}) ⇒ SentMessage
Delivers a message to this queue.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/aws/sqs/queue.rb', line 112 def body, = {} client_opts = .dup client_opts[:queue_url] = url client_opts[:message_body] = body response = client.(client_opts) msg = SentMessage.new msg. = response[:message_id] msg.md5 = response[:md5_of_message_body] msg end |
#visibility_timeout ⇒ Integer
Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.
319 320 321 |
# File 'lib/aws/sqs/queue.rb', line 319 def visibility_timeout get_attribute("VisibilityTimeout").to_i end |
#visibility_timeout=(timeout) ⇒ Object
Sets the visibility timeout for the queue.
332 333 334 335 |
# File 'lib/aws/sqs/queue.rb', line 332 def visibility_timeout=(timeout) set_attribute("VisibilityTimeout", timeout.to_s) timeout end |
#wait_time_seconds ⇒ Integer
Returns Gets the number of seconds the service will wait for a response when requesting a new message.
397 398 399 |
# File 'lib/aws/sqs/queue.rb', line 397 def wait_time_seconds get_attribute("ReceiveMessageWaitTimeSeconds").to_i end |
#wait_time_seconds=(seconds) ⇒ Object
Sets the number of seconds that the service should wait for a response when requesting a new message
405 406 407 |
# File 'lib/aws/sqs/queue.rb', line 405 def wait_time_seconds= seconds set_attribute("ReceiveMessageWaitTimeSeconds", seconds.to_s) end |