Class: AWS::SQS::Queue
- Inherits:
-
Object
- Object
- AWS::SQS::Queue
- Defined in:
- lib/aws/sqs/queue.rb
Overview
Represents an Amazon SQS Queue.
Defined Under Namespace
Classes: SentMessage
Constant Summary collapse
- DEFAULT_POLL_INTERVAL =
The default number of seconds to wait between polling requests for new messages.
1
Instance Attribute Summary collapse
-
#url ⇒ String
readonly
The queue URL.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
(also: #eql?)
Returns true if the other queue has the same url.
-
#approximate_number_of_messages ⇒ Integer
(also: #visible_messages)
The approximate number of visible messages in a queue.
-
#approximate_number_of_messages_delayed ⇒ Integer
Returns an approximate count of messages delayed.
-
#approximate_number_of_messages_not_visible ⇒ Integer
(also: #invisible_messages)
The approximate number of messages that are not timed-out and not deleted.
-
#arn ⇒ String
The queue’s Amazon resource name (ARN).
- #batch_change_visibility(*args) ⇒ Object
- #batch_delete(*messages) ⇒ nil
-
#batch_send(*messages) ⇒ Array<SentMessage>
Sends a batch of up to 10 messages in a single request.
-
#created_timestamp ⇒ Time
The time when the queue was created.
-
#delay_seconds ⇒ Integer
Gets the current default delay for messages sent to the queue.
-
#delay_seconds=(seconds) ⇒ Object
Sets the default delay for messages sent to the queue.
-
#delete ⇒ nil
Deletes the queue, regardless of whether it is empty.
-
#exists? ⇒ Boolean
True if the queue exists.
-
#last_modified_timestamp ⇒ Time
The time when the queue was last changed.
-
#maximum_message_size ⇒ Integer
The limit of how many bytes a message can contain before Amazon SQS rejects it.
-
#maximum_message_size=(size) ⇒ Object
Sets the maximum message size for the queue.
-
#message_retention_period ⇒ Integer
The number of seconds Amazon SQS retains a message.
-
#message_retention_period=(period) ⇒ Object
Sets the message retention period for the queue.
-
#policy ⇒ Policy
Returns the current queue policy if there is one.
-
#policy=(policy) ⇒ nil
Set the policy on this queue.
-
#poll(opts = {}) {|message| ... } ⇒ nil
Polls continually for messages.
-
#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage
(also: #receive_messages)
Retrieves one or more messages.
-
#send_message(body, options = {}) ⇒ SentMessage
Delivers a message to this queue.
-
#visibility_timeout ⇒ Integer
Returns the visibility timeout for the queue.
-
#visibility_timeout=(timeout) ⇒ Object
Sets the visibility timeout for the queue.
Instance Attribute Details
#url ⇒ String (readonly)
Returns The queue URL.
37 38 39 |
# File 'lib/aws/sqs/queue.rb', line 37 def url @url end |
Instance Method Details
#==(other) ⇒ Boolean Also known as: eql?
Returns true if the other queue has the same url.
620 621 622 |
# File 'lib/aws/sqs/queue.rb', line 620 def ==(other) other.kind_of?(Queue) and other.url == url end |
#approximate_number_of_messages ⇒ Integer Also known as: visible_messages
Returns The approximate number of visible messages in a queue. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.
278 279 280 |
# File 'lib/aws/sqs/queue.rb', line 278 def get_attribute("ApproximateNumberOfMessages").to_i end |
#approximate_number_of_messages_delayed ⇒ Integer
Returns an approximate count of messages delayed.
374 375 376 |
# File 'lib/aws/sqs/queue.rb', line 374 def get_attribute("ApproximateNumberOfMessagesDelayed").to_i end |
#approximate_number_of_messages_not_visible ⇒ Integer Also known as: invisible_messages
Returns The approximate number of messages that are not timed-out and not deleted. For more information, see Resources Required to Process Messages in the Amazon SQS Developer Guide.
288 289 290 |
# File 'lib/aws/sqs/queue.rb', line 288 def get_attribute("ApproximateNumberOfMessagesNotVisible").to_i end |
#arn ⇒ String
Returns The queue’s Amazon resource name (ARN).
379 380 381 |
# File 'lib/aws/sqs/queue.rb', line 379 def arn @arn ||= get_attribute("QueueArn") end |
#batch_change_visibility(visibility_timeout, *messages) ⇒ nil #batch_change_visibility(*messages_with_timeouts) ⇒ nil
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 |
# File 'lib/aws/sqs/queue.rb', line 585 def batch_change_visibility *args args = args.flatten if args.first.is_a?(Integer) timeout = args.shift = args.collect{|m| [m, timeout] } else = args.collect{|m| [m[:message], m[:visibility_timeout]] } end entries = [] .each do |msg,timeout| handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg entries << { :id => entries.size.to_s, :receipt_handle => handle, :visibility_timeout => timeout, } end response = client.( :queue_url => url, :entries => entries) failures = batch_failures(entries, response) raise Errors::BatchChangeVisibilityError.new(failures) unless failures.empty? nil end |
#batch_delete(*messages) ⇒ nil
524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 |
# File 'lib/aws/sqs/queue.rb', line 524 def batch_delete * entries = [] .flatten.each_with_index do |msg,n| handle = msg.is_a?(ReceivedMessage) ? msg.handle : msg entries << { :id => n.to_s, :receipt_handle => handle } end response = client.( :queue_url => url, :entries => entries) failures = batch_failures(entries, response) raise Errors::BatchDeleteError.new(failures) unless failures.empty? nil end |
#batch_send(*messages) ⇒ Array<SentMessage>
Sends a batch of up to 10 messages in a single request.
queue.('message-1', 'message-2')
You can also set an optional delay for all of the messages:
# delay all messages 1 hour
queue.batch_send(msg1, msg2, :delay_seconds => 3600)
If you need to set a custom delay for each message you can pass hashes:
= []
<< { :message_body => 'msg1', :delay_seconds => 60 }
<< { :message_body => 'msg2', :delay_seconds => 30 }
queue.batch_send()
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 |
# File 'lib/aws/sqs/queue.rb', line 479 def batch_send * entries = .flatten unless entries.first.is_a?(Hash) = entries.last.is_a?(Hash) ? entries.pop : {} entries = entries.collect{|msg| { :message_body => msg } } if delay = [:delay_seconds] entries.each {|entry| entry[:delay_seconds] = delay } end end entries.each_with_index {|entry,n| entry[:id] = n.to_s } client_opts = {} client_opts[:queue_url] = url client_opts[:entries] = entries response = client.(client_opts) failed = batch_failures(entries, response) sent = response[:successful].collect do |sent| msg = SentMessage.new msg. = sent[:message_id] msg.md5 = sent[:md5_of_message_body] msg end raise Errors::BatchSendError.new(sent, failed) unless failed.empty? sent end |
#created_timestamp ⇒ Time
Returns The time when the queue was created.
317 318 319 |
# File 'lib/aws/sqs/queue.rb', line 317 def Time.at(get_attribute("CreatedTimestamp").to_i) end |
#delay_seconds ⇒ Integer
Returns Gets the current default delay for messages sent to the queue.
363 364 365 |
# File 'lib/aws/sqs/queue.rb', line 363 def delay_seconds get_attribute("DelaySeconds").to_i end |
#delay_seconds=(seconds) ⇒ Object
Sets the default delay for messages sent to the queue.
369 370 371 |
# File 'lib/aws/sqs/queue.rb', line 369 def delay_seconds= seconds set_attribute("DelaySeconds", seconds.to_s) end |
#delete ⇒ nil
Deletes the queue, regardless of whether it is empty.
When you delete a queue, the deletion process takes up to 60 seconds. Requests you send involving that queue during the 60 seconds might succeed. For example, calling #send_message might succeed, but after the 60 seconds, the queue and that message you sent no longer exist.
Also, when you delete a queue, you must wait at least 60 seconds before creating a queue with the same name.
56 57 58 59 |
# File 'lib/aws/sqs/queue.rb', line 56 def delete client.delete_queue(:queue_url => url) nil end |
#exists? ⇒ Boolean
This may raise an exception if you don’t have permission to access the queue attributes. Also, it may return true for up to 60 seconds after a queue has been deleted.
Returns True if the queue exists.
389 390 391 392 393 394 395 396 |
# File 'lib/aws/sqs/queue.rb', line 389 def exists? client.get_queue_attributes(:queue_url => url, :attribute_names => ["QueueArn"]) rescue Errors::NonExistentQueue, Errors::InvalidAddress false else true end |
#last_modified_timestamp ⇒ Time
Returns The time when the queue was last changed.
322 323 324 |
# File 'lib/aws/sqs/queue.rb', line 322 def Time.at(get_attribute("LastModifiedTimestamp").to_i) end |
#maximum_message_size ⇒ Integer
Returns The limit of how many bytes a message can contain before Amazon SQS rejects it.
328 329 330 |
# File 'lib/aws/sqs/queue.rb', line 328 def get_attribute("MaximumMessageSize").to_i end |
#maximum_message_size=(size) ⇒ Object
Sets the maximum message size for the queue.
339 340 341 |
# File 'lib/aws/sqs/queue.rb', line 339 def (size) set_attribute("MaximumMessageSize", size.to_s) end |
#message_retention_period ⇒ Integer
Returns The number of seconds Amazon SQS retains a message.
345 346 347 |
# File 'lib/aws/sqs/queue.rb', line 345 def get_attribute("MessageRetentionPeriod").to_i end |
#message_retention_period=(period) ⇒ Object
Sets the message retention period for the queue
356 357 358 359 |
# File 'lib/aws/sqs/queue.rb', line 356 def (period) set_attribute("MessageRetentionPeriod", period.to_s) period end |
#policy ⇒ Policy
Returns the current queue policy if there is one. Returns nil
otherwise.
416 417 418 419 420 421 422 423 424 425 |
# File 'lib/aws/sqs/queue.rb', line 416 def policy if policy_json = get_attribute('Policy') policy = SQS::Policy.from_json(policy_json) policy.extend(PolicyProxy) policy.queue = self policy else nil end end |
#policy=(policy) ⇒ nil
Set the policy on this queue.
If you pass nil or an empty string then it will have the same effect as deleting the policy.
438 439 440 441 442 443 444 445 446 |
# File 'lib/aws/sqs/queue.rb', line 438 def policy= policy policy_string = case policy when nil, '' then '' when String then policy else policy.to_json end set_attribute('Policy', policy_string) nil end |
#poll(opts = {}) {|message| ... } ⇒ nil
Polls continually for messages. For example, you can use this to poll indefinitely:
queue.poll { |msg| puts msg.body }
Or, to poll indefinitely for the first message and then continue polling until no message is received for a period of at least ten seconds:
queue.poll(:initial_timeout => false,
:idle_timeout => 10) { |msg| puts msg.body }
As with the block form of #receive_message, this method automatically deletes the message then the block exits normally.
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/aws/sqs/queue.rb', line 251 def poll(opts = {}, &block) poll_interval = opts[:poll_interval] || DEFAULT_POLL_INTERVAL opts[:limit] = opts.delete(:batch_size) if opts.key?(:batch_size) = Time.now got_first = false loop do got_msg = false (opts) do || got_msg = got_first = true = Time.now yield() end unless got_msg Kernel.sleep(poll_interval) unless poll_interval == 0 return if hit_timeout?(got_first, , opts) end end nil end |
#receive_message(opts = {}) {|message| ... } ⇒ ReceivedMessage Also known as: receive_messages
Due to the distributed nature of the queue, a weighted random set of machines is sampled on a ReceiveMessage call. That means only the messages on the sampled machines are returned. If the number of messages in the queue is small (less than 1000), it is likely you will get fewer messages than you requested per call to #receive_message. If the number of messages in the queue is extremely small, you might not receive any messages. To poll continually for messages, use the #poll method, which automatically retries the request after a configurable delay.
Retrieves one or more messages. When a block is given, each message is yielded to the block and then deleted as long as the block exits normally. When no block is given, you must delete the message yourself using ReceivedMessage#delete.
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/aws/sqs/queue.rb', line 172 def (opts = {}, &block) resp = client.(receive_opts(opts)) = resp[:messages].map do |m| ReceivedMessage.new(self, m[:message_id], m[:receipt_handle], :body => m[:body], :md5 => m[:md5_of_body], :attributes => m[:attributes]) end if block (, block) elsif opts[:limit] && opts[:limit] != 1 else .first end end |
#send_message(body, options = {}) ⇒ SentMessage
Delivers a message to this queue.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/aws/sqs/queue.rb', line 105 def body, = {} client_opts = .dup client_opts[:queue_url] = url client_opts[:message_body] = body response = client.(client_opts) msg = SentMessage.new msg. = response[:message_id] msg.md5 = response[:md5_of_message_body] msg end |
#visibility_timeout ⇒ Integer
Returns the visibility timeout for the queue. For more information about visibility timeout, see Visibility Timeout in the Amazon SQS Developer Guide.
298 299 300 |
# File 'lib/aws/sqs/queue.rb', line 298 def visibility_timeout get_attribute("VisibilityTimeout").to_i end |
#visibility_timeout=(timeout) ⇒ Object
Sets the visibility timeout for the queue.
311 312 313 314 |
# File 'lib/aws/sqs/queue.rb', line 311 def visibility_timeout=(timeout) set_attribute("VisibilityTimeout", timeout.to_s) timeout end |