Class: AWS::SQS::Queue
- Inherits:
-
Object
- Object
- AWS::SQS::Queue
- Defined in:
- lib/aws/sqs/queue.rb
Overview
Represents an Amazon SQS Queue.
Defined Under Namespace
Classes: SentMessage
Constant Summary collapse
- DEFAULT_POLL_INTERVAL =
Deprecated.
No longer used by #poll
The default number of seconds to wait between polling requests for new messages.
1
- DEFAULT_WAIT_TIME_SECONDS =
The default number of seconds to pass in as the SQS long polling value (
:wait_time_seconds
) in #receive_message. 15
Instance Attribute Summary collapse
-
#url ⇒ String
readonly
The queue URL.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
(also: #eql?)
Returns true if the other queue has the same url.
-
#approximate_number_of_messages ⇒ Integer
(also: #visible_messages)
The approximate number of visible messages in a queue.
-
#approximate_number_of_messages_delayed ⇒ Integer
Returns an approximate count of messages delayed.
-
#approximate_number_of_messages_not_visible ⇒ Integer
(also: #invisible_messages)
The approximate number of messages that are not timed-out and not deleted.
-
#arn ⇒ String
The queue's Amazon resource name (ARN).
- #batch_change_visibility(*args) ⇒ Object
- #batch_delete(*messages) ⇒ nil
-
#batch_send(*messages) ⇒ Array<SentMessage>
Sends a batch of up to 10 messages in a single request.
-
#created_timestamp ⇒ Time
The time when the queue was created.
-
#delay_seconds ⇒ Integer
Gets the current default delay for messages sent to the queue.
-
#delay_seconds=(seconds) ⇒ Object
Sets the default delay for messages sent to the queue.
-
#delete ⇒ nil
Deletes the queue, regardless of whether it is empty.
-
#exists? ⇒ Boolean
True if the queue exists.
-
#last_modified_timestamp ⇒ Time
The time when the queue was last changed.
-
#maximum_message_size ⇒ Integer
The limit of how many bytes a message can contain before Amazon SQS rejects it.
-
#maximum_message_size=(size) ⇒ Object
Sets the maximum message size for the queue.
-
#message_retention_period ⇒ Integer
The number of seconds Amazon SQS retains a message.
-
#message_retention_period=(period) ⇒ Object
Sets the message retention period for the queue.
-
#policy ⇒ Policy
Returns the current queue policy if there is one.
-
#policy=(policy) ⇒ nil
Set the policy on this queue.
-
#poll(opts = {}) {|message| ... } ⇒ nil
Polls continually for messages.
-
#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage
(also: #receive_messages)
Retrieves one or more messages.
-
#send_message(body, options = {}) ⇒ SentMessage
Delivers a message to this queue.
-
#visibility_timeout ⇒ Integer
Returns the visibility timeout for the queue.
-
#visibility_timeout=(timeout) ⇒ Object
Sets the visibility timeout for the queue.
-
#wait_time_seconds ⇒ Integer
Gets the number of seconds the service will wait for a response when requesting a new message.
-
#wait_time_seconds=(seconds) ⇒ Object
Sets the number of seconds that the service should wait for a response when requesting a new message.
Instance Attribute Details
#url ⇒ String (readonly)
Returns The queue URL.
46 47 48 |
# File 'lib/aws/sqs/queue.rb', line 46 def url @url end |
Instance Method Details
#==(other) ⇒ Boolean Also known as: eql?
Returns true if the other queue has the same url.
655 656 657 |
# File 'lib/aws/sqs/queue.rb', line 655 def ==(other) other.kind_of?(Queue) and other.url == url end |
#approximate_number_of_messages ⇒ Integer Also known as: visible_messages
Returns The approximate number of visible messages in a queue. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.
300 301 302 |
# File 'lib/aws/sqs/queue.rb', line 300 def get_attribute("ApproximateNumberOfMessages").to_i end |
#approximate_number_of_messages_delayed ⇒ Integer
Returns an approximate count of messages delayed.
409 410 411 |
# File 'lib/aws/sqs/queue.rb', line 409 def get_attribute("ApproximateNumberOfMessagesDelayed").to_i end |
#approximate_number_of_messages_not_visible ⇒ Integer Also known as: invisible_messages
Returns The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.
309 310 311 |
# File 'lib/aws/sqs/queue.rb', line 309 def get_attribute("ApproximateNumberOfMessagesNotVisible").to_i end |
#arn ⇒ String
Returns The queue's Amazon resource name (ARN).
414 415 416 |
# File 'lib/aws/sqs/queue.rb', line 414 def arn @arn ||= get_attribute("QueueArn") end |
#batch_change_visibility(visibility_timeout, *messages) ⇒ nil #batch_change_visibility(*messages_with_timeouts) ⇒ nil
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 |
# File 'lib/aws/sqs/queue.rb', line 620 def batch_change_visibility *args args = args.flatten if args.first.is_a?(Integer) timeout = args.shift = args.collect{|m| [m, timeout] } else = args.collect{|m| [m[:message], m[:visibility_timeout]] } end entries = [] .each do |msg,timeout| handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg entries << { :id => entries.size.to_s, :receipt_handle => handle, :visibility_timeout => timeout, } end response = client.( :queue_url => url, :entries => entries) failures = batch_failures(entries, response) raise Errors::BatchChangeVisibilityError.new(failures) unless failures.empty? nil end |
#batch_delete(*messages) ⇒ nil
559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 |
# File 'lib/aws/sqs/queue.rb', line 559 def batch_delete * entries = [] .flatten.each_with_index do |msg,n| handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg entries << { :id => n.to_s, :receipt_handle => handle } end response = client.( :queue_url => url, :entries => entries) failures = batch_failures(entries, response) raise Errors::BatchDeleteError.new(failures) unless failures.empty? nil end |
#batch_send(*messages) ⇒ Array<SentMessage>
Sends a batch of up to 10 messages in a single request.
queue.('message-1', 'message-2')
You can also set an optional delay for all of the messages:
# delay all messages 1 hour
queue.batch_send(msg1, msg2, :delay_seconds => 3600)
If you need to set a custom delay for each message you can pass hashes:
= []
<< { :message_body => 'msg1', :delay_seconds => 60 }
<< { :message_body => 'msg2', :delay_seconds => 30 }
queue.batch_send()
514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 |
# File 'lib/aws/sqs/queue.rb', line 514 def batch_send * entries = .flatten unless entries.first.is_a?(Hash) = entries.last.is_a?(Hash) ? entries.pop : {} entries = entries.collect{|msg| { :message_body => msg } } if delay = [:delay_seconds] entries.each {|entry| entry[:delay_seconds] = delay } end end entries.each_with_index {|entry,n| entry[:id] = n.to_s } client_opts = {} client_opts[:queue_url] = url client_opts[:entries] = entries response = client.(client_opts) failed = batch_failures(entries, response) sent = response[:successful].collect do |sent| msg = SentMessage.new msg. = sent[:message_id] msg.md5 = sent[:md5_of_message_body] msg end raise Errors::BatchSendError.new(sent, failed) unless failed.empty? sent end |
#created_timestamp ⇒ Time
Returns The time when the queue was created.
337 338 339 |
# File 'lib/aws/sqs/queue.rb', line 337 def Time.at(get_attribute("CreatedTimestamp").to_i) end |
#delay_seconds ⇒ Integer
Returns Gets the current default delay for messages sent to the queue.
383 384 385 |
# File 'lib/aws/sqs/queue.rb', line 383 def delay_seconds get_attribute("DelaySeconds").to_i end |
#delay_seconds=(seconds) ⇒ Object
Sets the default delay for messages sent to the queue.
389 390 391 |
# File 'lib/aws/sqs/queue.rb', line 389 def delay_seconds= seconds set_attribute("DelaySeconds", seconds.to_s) end |
#delete ⇒ nil
Deletes the queue, regardless of whether it is empty.
When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue during the 60 seconds might succeed. For example, calling #send_message might succeed, but after the 60 seconds, the queue and that message you sent no longer exist.
Also, when you delete a queue, you must wait at least 60 seconds before creating a queue with the same name.
65 66 67 68 |
# File 'lib/aws/sqs/queue.rb', line 65 def delete client.delete_queue(:queue_url => url) nil end |
#exists? ⇒ Boolean
This may raise an exception if you don't have permission to access the queue attributes. Also, it may return true for up to 60 seconds after a queue has been deleted.
Returns True if the queue exists.
424 425 426 427 428 429 430 431 |
# File 'lib/aws/sqs/queue.rb', line 424 def exists? client.get_queue_attributes(:queue_url => url, :attribute_names => ["QueueArn"]) rescue Errors::NonExistentQueue, Errors::InvalidAddress false else true end |
#last_modified_timestamp ⇒ Time
Returns The time when the queue was last changed.
342 343 344 |
# File 'lib/aws/sqs/queue.rb', line 342 def Time.at(get_attribute("LastModifiedTimestamp").to_i) end |
#maximum_message_size ⇒ Integer
Returns The limit of how many bytes a message can contain before Amazon SQS rejects it.
348 349 350 |
# File 'lib/aws/sqs/queue.rb', line 348 def get_attribute("MaximumMessageSize").to_i end |
#maximum_message_size=(size) ⇒ Object
Sets the maximum message size for the queue.
359 360 361 |
# File 'lib/aws/sqs/queue.rb', line 359 def (size) set_attribute("MaximumMessageSize", size.to_s) end |
#message_retention_period ⇒ Integer
Returns The number of seconds Amazon SQS retains a message.
365 366 367 |
# File 'lib/aws/sqs/queue.rb', line 365 def get_attribute("MessageRetentionPeriod").to_i end |
#message_retention_period=(period) ⇒ Object
Sets the message retention period for the queue
376 377 378 379 |
# File 'lib/aws/sqs/queue.rb', line 376 def (period) set_attribute("MessageRetentionPeriod", period.to_s) period end |
#policy ⇒ Policy
Returns the current queue policy if there is one.
Returns nil
otherwise.
451 452 453 454 455 456 457 458 459 460 |
# File 'lib/aws/sqs/queue.rb', line 451 def policy if policy_json = get_attribute('Policy') policy = SQS::Policy.from_json(policy_json) policy.extend(PolicyProxy) policy.queue = self policy else nil end end |
#policy=(policy) ⇒ nil
Set the policy on this queue.
If you pass nil or an empty string then it will have the same effect as deleting the policy.
473 474 475 476 477 478 479 480 481 |
# File 'lib/aws/sqs/queue.rb', line 473 def policy= policy policy_string = case policy when nil, '' then '' when String then policy else policy.to_json end set_attribute('Policy', policy_string) nil end |
#poll(opts = {}) {|message| ... } ⇒ nil
Polls continually for messages. For example, you can use this to poll indefinitely:
queue.poll { |msg| puts msg.body }
Or, to poll indefinitely for the first message and then continue polling until no message is received for a period of at least ten seconds:
queue.poll(:initial_timeout => false,
:idle_timeout => 10) { |msg| puts msg.body }
As with the block form of #receive_message, this method automatically deletes the message then the block exits normally.
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
# File 'lib/aws/sqs/queue.rb', line 273 def poll(opts = {}, &block) opts[:limit] = opts.delete(:batch_size) if opts.key?(:batch_size) opts[:wait_time_seconds] = DEFAULT_WAIT_TIME_SECONDS unless opts.has_key?(:wait_time_seconds) = Time.now got_first = false loop do got_msg = false (opts) do || got_msg = got_first = true = Time.now yield() end unless got_msg return if hit_timeout?(got_first, , opts) end end nil end |
#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage Also known as: receive_messages
Due to the distributed nature of the queue, a weighted random set of machines is sampled on a ReceiveMessage call. That means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per call to #receive_message. If the number of messages in the queue is extremely small, you might not receive any messages. To poll continually for messages, use the #poll method, which automatically retries the request after a configurable delay.
Retrieves one or more messages. When a block is given, each message is yielded to the block and then deleted as long as the block exits normally. When no block is given, you must delete the message yourself using ReceivedMessage#delete.
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/aws/sqs/queue.rb', line 187 def (opts = {}, &block) resp = client.(receive_opts(opts)) = resp[:messages].map do |m| ReceivedMessage.new(self, m[:message_id], m[:receipt_handle], :body => m[:body], :md5 => m[:md5_of_body], :attributes => m[:attributes]) end if block (, block) elsif opts[:limit] && opts[:limit] != 1 else .first end end |
#send_message(body, options = {}) ⇒ SentMessage
Delivers a message to this queue.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/aws/sqs/queue.rb', line 114 def body, = {} client_opts = .dup client_opts[:queue_url] = url client_opts[:message_body] = body response = client.(client_opts) msg = SentMessage.new msg. = response[:message_id] msg.md5 = response[:md5_of_message_body] msg end |
#visibility_timeout ⇒ Integer
Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.
318 319 320 |
# File 'lib/aws/sqs/queue.rb', line 318 def visibility_timeout get_attribute("VisibilityTimeout").to_i end |
#visibility_timeout=(timeout) ⇒ Object
Sets the visibility timeout for the queue.
331 332 333 334 |
# File 'lib/aws/sqs/queue.rb', line 331 def visibility_timeout=(timeout) set_attribute("VisibilityTimeout", timeout.to_s) timeout end |
#wait_time_seconds ⇒ Integer
Returns Gets the number of seconds the service will wait for a response when requesting a new message.
396 397 398 |
# File 'lib/aws/sqs/queue.rb', line 396 def wait_time_seconds get_attribute("ReceiveMessageWaitTimeSeconds").to_i end |
#wait_time_seconds=(seconds) ⇒ Object
Sets the number of seconds that the service should wait for a response when requesting a new message
404 405 406 |
# File 'lib/aws/sqs/queue.rb', line 404 def wait_time_seconds= seconds set_attribute("ReceiveMessageWaitTimeSeconds", seconds.to_s) end |