Class: Aws::SQS::QueuePoller
- Inherits:
-
Object
- Object
- Aws::SQS::QueuePoller
- Defined in:
- lib/aws-sdk-sqs/queue_poller.rb
Overview
A utility class for long polling messages in a loop. **Messages are automatically deleted from the queue at the end of the given block.**
poller = Aws::SQS::QueuePoller.new(queue_url)
poller.poll do |msg|
puts msg.body
end
## Long Polling
By default, messages are received using long polling. This method will force a default ‘:wait_time_seconds` of 20 seconds. If you prefer to use the queue default wait time, then pass a `nil` value for `:wait_time_seconds`.
# disables 20 second default, use queue ReceiveMessageWaitTimeSeconds
poller.poll(wait_time_seconds:nil) do |msg|
# ...
end
When disabling ‘:wait_time_seconds` by passing `nil`, you must ensure the queue `ReceiveMessageWaitTimeSeconds` attribute is set to a non-zero value, or you will be short-polling. This will trigger significantly more API calls.
## Batch Receiving Messages
You can specify a maximum number of messages to receive with each polling attempt via ‘:max_number_of_messages`. When this is set to a positive value, greater than 1, the block will receive an array of messages, instead of a single message.
# receives and yields 1 message at a time
poller.poll do |msg|
# ...
end
# receives and yields up to 10 messages at a time
poller.poll(max_number_of_messages:10) do ||
.each do |msg|
# ...
end
end
The maximum value for ‘:max_number_of_messages` is enforced by Amazon SQS.
## Visibility Timeouts
When receiving messages, you have a fixed amount of time to process and delete the message before it is added back into the queue. This is the visibility timeout. By default, the queue’s ‘VisibilityTimeout` attribute is used. You can provide an alternative visibility timeout when polling.
# queue default VisibilityTimeout
poller.poll do |msg|
end
# custom visibility timeout
poller.poll(visibility_timeout:10) do |msg|
end
You can reset the visibility timeout of a single message by calling #change_message_visibility_timeout. This is useful when you need more time to finish processing the message.
poller.poll do |msg|
# do work ...
# need more time for processing
poller.(msg, 60)
# finish work ...
end
If you change the visibility timeout of a message to zero, it will return to the queue immediately.
## Deleting Messages
Messages are deleted from the queue when the block returns normally.
poller.poll do |msg|
# do work
end # messages deleted here
You can skip message deletion by passing ‘skip_delete: true`. This allows you to manually delete the messages using #delete_message, or #delete_messages.
# single message
poller.poll(skip_delete: true) do |msg|
poller.(msg) # if successful
end
# batch delete messages
poller.poll(skip_delete: true, max_number_of_messages:10) do ||
poller.()
end
Another way to manage message deletion is to throw ‘:skip_delete` from the poll block. You can use this to choose when a message, or message batch is deleted on an individual basis. This can be very useful when you are capturing temporal errors and wish for the message to timeout.
poller.poll do |msg|
begin
# do work
rescue
# unexpected error occurred while processing messages,
# log it, and skip delete so it can be re-processed later
throw :skip_delete
end
end
## Terminating the Polling Loop
By default, polling will continue indefinitely. You can stop the poller by providing an idle timeout or by throwing ‘:stop_polling` from the #before_request callback.
### ‘:idle_timeout` Option
This is a configurable, maximum number of seconds to wait for a new message before the polling loop exists. By default, there is no idle timeout.
# stops polling after a minute of no received messages
poller.poll(idle_timeout: 60) do |msg|
# ...
end
### Throw ‘:stop_polling`
If you want more fine grained control, you can configure a before request callback to trigger before each long poll. Throwing ‘:stop_polling` from this callback will cause the poller to exit normally without making the next request.
# stop after processing 100 messages
poller.before_request do |stats|
throw :stop_polling if stats. >= 100
end
poller.poll do |msg|
# do work ...
end
## Tracking Progress
The poller will automatically track a few statistics client-side in a PollerStats object. You can access the poller stats three ways:
-
The first block argument of #before_request
-
The second block argument of #poll.
-
The return value from #poll.
Here are examples of accessing the statistics.
-
Configure a #before_request callback.
“‘ poller.before_request do |stats|
logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.}") logger.info("last-timestamp: #{stats.}")
end “‘
-
Configure an #after_empty_receive callback.
“‘ poller.after_empty_receive do |stats|
logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.}") logger.info("last-timestamp: #{stats.}")
end “‘
-
Accept a 2nd argument in the poll block, for example:
“‘ poller.poll do |msg, stats|
logger.info("requests: #{stats.request_count}") logger.info("messages: #{stats.}") logger.info("last-timestamp: #{stats.}")
end “‘
-
Return value:
“‘ stats = poller.poll(idle_timeout:10) do |msg|
# do work ...
end logger.info(“requests: #statsstats.request_count”) logger.info(“messages: #statsstats.received_message_count”) logger.info(“last-timestamp: #statsstats.last_message_received_at”) “‘
Defined Under Namespace
Classes: PollerConfig, PollerStats
Instance Attribute Summary collapse
- #client ⇒ Client readonly
- #default_config ⇒ PollerConfig readonly
- #queue_url ⇒ String readonly
Instance Method Summary collapse
-
#after_empty_receive {|stats| ... } ⇒ void
Registers a callback that is invoked when the poll requests returns with no messages.
-
#before_request {|stats| ... } ⇒ void
Registers a callback that is invoked once before every polling attempt.
- #change_message_visibility_timeout(message, seconds) ⇒ Object
- #delete_message(message) ⇒ Object
- #delete_messages(messages) ⇒ Object
-
#initialize(queue_url, options = {}) ⇒ QueuePoller
constructor
A new instance of QueuePoller.
-
#poll(options = {}, &block) ⇒ PollerStats
Polls the queue, yielded a message, or an array of messages.
Constructor Details
#initialize(queue_url, options = {}) ⇒ QueuePoller
Returns a new instance of QueuePoller.
218 219 220 221 222 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 218 def initialize(queue_url, = {}) @queue_url = queue_url @client = .delete(:client) || Client.new @default_config = PollerConfig.new() end |
Instance Attribute Details
#client ⇒ Client (readonly)
228 229 230 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 228 def client @client end |
#default_config ⇒ PollerConfig (readonly)
231 232 233 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 231 def default_config @default_config end |
#queue_url ⇒ String (readonly)
225 226 227 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 225 def queue_url @queue_url end |
Instance Method Details
#after_empty_receive {|stats| ... } ⇒ void
This method returns an undefined value.
Registers a callback that is invoked when the poll requests returns with no messages. This callback is invoked after the idle timeout is checked.
poller.after_empty_receive do |stats|
# Handle empty receive
end
280 281 282 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 280 def after_empty_receive(&block) @default_config = @default_config.with(after_empty_receive: block) if block_given? end |
#before_request {|stats| ... } ⇒ void
This method returns an undefined value.
Registers a callback that is invoked once before every polling attempt.
poller.before_request do |stats|
logger.info("requests: #{stats.request_count}")
logger.info("messages: #{stats.}")
logger.info("last-timestamp: #{stats.}")
end
poller.poll do |msg|
# do work ...
end
## ‘:stop_polling`
If you throw ‘:stop_polling` from the #before_request callback, then the poller will exit normally before making the next long poll request.
poller.before_request do |stats|
throw :stop_polling if stats. >= 100
end
# at most 100 messages will be yielded
poller.poll do |msg|
# do work ...
end
265 266 267 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 265 def before_request(&block) @default_config = @default_config.with(before_request: block) if block_given? end |
#change_message_visibility_timeout(message, seconds) ⇒ Object
This method should be called from inside a #poll block.
374 375 376 377 378 379 380 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 374 def (, seconds) @client.( queue_url: @queue_url, receipt_handle: .receipt_handle, visibility_timeout: seconds ) end |
#delete_message(message) ⇒ Object
This method should be called from inside a #poll block.
385 386 387 388 389 390 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 385 def () @client.( queue_url: @queue_url, receipt_handle: .receipt_handle ) end |
#delete_messages(messages) ⇒ Object
This method should be called from inside a #poll block.
396 397 398 399 400 401 402 403 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 396 def () @client.( queue_url: @queue_url, entries: .map do |msg| { id: msg., receipt_handle: msg.receipt_handle } end ) end |
#poll(options = {}, &block) ⇒ PollerStats
Polls the queue, yielded a message, or an array of messages. Messages are automatically deleted from the queue at the end of the given block. See the class documentation on Aws::SQS::QueuePoller for more examples.
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 |
# File 'lib/aws-sdk-sqs/queue_poller.rb', line 352 def poll( = {}, &block) config = @default_config.with() stats = PollerStats.new catch(:stop_polling) do loop do = (config, stats) if .empty? check_idle_timeout(config, stats) config.after_empty_receive&.call(stats) else (config, stats, , &block) end end end stats.polling_stopped_at = Time.now stats end |