Class: Sqewer::Connection
- Inherits:
-
Object
- Object
- Sqewer::Connection
- Defined in:
- lib/sqewer/connection.rb
Overview
Adapter that handles communication with a specific queue. In the future this could be switched to a Google PubSub queue, or to AMQP, or to any other queue with guaranteed re-delivery without ACK. The required queue semantics are very simple:
- no message should be deleted if the receiving client has not deleted it explicitly
- any execution that ends with an exception should cause the message to be re-enqueued
Direct Known Subclasses
Defined Under Namespace
Classes: DeleteBuffer, Message, MessageBuffer, SendBuffer
Constant Summary collapse
- DEFAULT_TIMEOUT_SECONDS =
5
- BATCH_RECEIVE_SIZE =
10
- MAX_RANDOM_FAILURES_PER_CALL =
10
- MAX_RANDOM_RECEIVE_FAILURES =
sure to hit the max_elapsed_time of 900 seconds
100
- NotOurFaultAwsError =
Class.new(Sqewer::Error)
Class Method Summary collapse
-
.client ⇒ Object
Returns a singleton of Aws::SQS::Client.
-
.default ⇒ Object
Returns the default adapter, connected to the queue set via the
SQS_QUEUE_URL
environment variable.
Instance Method Summary collapse
-
#delete_message(message_identifier) ⇒ void
Deletes a message after it has been succesfully decoded and processed.
-
#delete_multiple_messages {|#delete_message| ... } ⇒ void
Deletes multiple messages after they all have been succesfully decoded and processed.
-
#initialize(queue_url, client: self.class.client) ⇒ Connection
constructor
Initializes a new adapter, with access to the SQS queue at the given URL.
-
#receive_messages ⇒ Array<Message>
Receive at most 10 messages from the queue, and return the array of Message objects.
-
#send_message(message_body, **kwargs_for_send) ⇒ void
Send a message to the backing queue.
-
#send_multiple_messages {|#send_message| ... } ⇒ void
Send multiple messages.
Constructor Details
#initialize(queue_url, client: self.class.client) ⇒ Connection
Initializes a new adapter, with access to the SQS queue at the given URL.
58 59 60 61 |
# File 'lib/sqewer/connection.rb', line 58 def initialize(queue_url, client: self.class.client) @queue_url = queue_url @client = client end |
Class Method Details
.client ⇒ Object
Returns a singleton of Aws::SQS::Client
43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/sqewer/connection.rb', line 43 def self.client # It's better using a singleton client to prevent making a lot of HTTP # requests to the AWS metadata endpoint when getting credentials. @client ||= begin require 'aws-sdk-sqs' ::Aws::SQS::Client.new( instance_profile_credentials_timeout: 1, instance_profile_credentials_retries: 5, ) end end |
.default ⇒ Object
Returns the default adapter, connected to the queue set via the SQS_QUEUE_URL
environment variable. Switches to SQLite-backed local queue if the SQS_QUEUE_URL
is prefixed with 'sqlite3://'
30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/sqewer/connection.rb', line 30 def self.default url_str = ENV.fetch('SQS_QUEUE_URL') uri = URI(url_str) if uri.scheme == 'sqlite3' Sqewer::LocalConnection.new(uri.to_s) else new(uri.to_s) end rescue KeyError => e raise "SQS_QUEUE_URL not set in the environment. This is the queue URL Sqewer uses by default." end |
Instance Method Details
#delete_message(message_identifier) ⇒ void
This method returns an undefined value.
Deletes a message after it has been succesfully decoded and processed
198 199 200 |
# File 'lib/sqewer/connection.rb', line 198 def () {|via| via.() } end |
#delete_multiple_messages {|#delete_message| ... } ⇒ void
This method returns an undefined value.
Deletes multiple messages after they all have been succesfully decoded and processed.
206 207 208 209 210 211 |
# File 'lib/sqewer/connection.rb', line 206 def buffer = DeleteBuffer.new yield(buffer) buffer.each_batch {|batch| handle_batch_with_retries(:delete_message_batch, batch) } end |
#receive_messages ⇒ Array<Message>
Receive at most 10 messages from the queue, and return the array of Message objects. Retries for at most 900 seconds (15 minutes) and then gives up, thereby crashing the read loop. If SQS is not available even after 15 minutes it is either down or the server is misconfigured. Either way it makes no sense to continue.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/sqewer/connection.rb', line 69 def Retriable.retriable on: network_and_aws_sdk_errors, tries: MAX_RANDOM_RECEIVE_FAILURES do response = @client.( queue_url: @queue_url, attribute_names: ['All'], wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: BATCH_RECEIVE_SIZE ) response..map {|| Message.new(.receipt_handle, .body, .attributes) } end rescue *sqs_errors_to_release_client # We noticed cases where errors related to AWS credentials started to happen suddenly. # We don't know the root cause yet, but what we can do is release the # singleton @client instance because it contains a cache of credentials that in most # cases is no longer valid. self.class.release_client raise end |
#send_message(message_body, **kwargs_for_send) ⇒ void
This method returns an undefined value.
Send a message to the backing queue
Passes the arguments to the AWS SDK.
95 96 97 |
# File 'lib/sqewer/connection.rb', line 95 def (, **kwargs_for_send) {|via| via.(, **kwargs_for_send) } end |
#send_multiple_messages {|#send_message| ... } ⇒ void
This method returns an undefined value.
Send multiple messages. If any messages fail to send, an exception will be raised.
187 188 189 190 191 192 |
# File 'lib/sqewer/connection.rb', line 187 def buffer = SendBuffer.new yield(buffer) buffer.each_batch {|batch| handle_batch_with_retries(:send_message_batch, batch) } end |