Class: Sqewer::Connection
- Inherits:
-
Object
- Object
- Sqewer::Connection
- Defined in:
- lib/sqewer/connection.rb
Overview
Adapter that handles communication with a specific queue. In the future this could be switched to a Google PubSub queue, or to AMQP, or to any other queue with guaranteed re-delivery without ACK. The required queue semantics are very simple:
- no message should be deleted if the receiving client has not deleted it explicitly
- any execution that ends with an exception should cause the message to be re-enqueued
Defined Under Namespace
Classes: DeleteBuffer, Message, MessageBuffer, RetryWrapper, SendBuffer
Constant Summary collapse
- DEFAULT_TIMEOUT_SECONDS =
5
- BATCH_RECEIVE_SIZE =
10
- MAX_RANDOM_FAILURES_PER_CALL =
10
- NotOurFaultAwsError =
Class.new(StandardError)
Class Method Summary collapse
-
.default ⇒ Object
Returns the default adapter, connected to the queue set via the
SQS_QUEUE_URL
environment variable.
Instance Method Summary collapse
-
#delete_message(message_identifier) ⇒ void
Deletes a message after it has been succesfully decoded and processed.
-
#delete_multiple_messages {|#delete_message| ... } ⇒ void
Deletes multiple messages after they all have been succesfully decoded and processed.
-
#initialize(queue_url) ⇒ Connection
constructor
Initializes a new adapter, with access to the SQS queue at the given URL.
-
#receive_messages ⇒ Array<Message>
Receive at most 10 messages from the queue, and return the array of Message objects.
-
#send_message(message_body, **kwargs_for_send) ⇒ void
Send a message to the backing queue.
-
#send_multiple_messages {|#send_message| ... } ⇒ void
Send multiple messages.
Constructor Details
#initialize(queue_url) ⇒ Connection
Initializes a new adapter, with access to the SQS queue at the given URL.
37 38 39 40 |
# File 'lib/sqewer/connection.rb', line 37 def initialize(queue_url) require 'aws-sdk' @queue_url = queue_url end |
Class Method Details
.default ⇒ Object
Returns the default adapter, connected to the queue set via the SQS_QUEUE_URL
environment variable.
28 29 30 31 32 |
# File 'lib/sqewer/connection.rb', line 28 def self.default new(ENV.fetch('SQS_QUEUE_URL')) rescue KeyError => e raise "SQS_QUEUE_URL not set in the environment. This is the queue URL that the default that Sqewer uses" end |
Instance Method Details
#delete_message(message_identifier) ⇒ void
This method returns an undefined value.
Deletes a message after it has been succesfully decoded and processed
110 111 112 |
# File 'lib/sqewer/connection.rb', line 110 def () {|via| via.() } end |
#delete_multiple_messages {|#delete_message| ... } ⇒ void
This method returns an undefined value.
Deletes multiple messages after they all have been succesfully decoded and processed.
118 119 120 121 122 123 |
# File 'lib/sqewer/connection.rb', line 118 def buffer = DeleteBuffer.new yield(buffer) buffer.each_batch {|batch| handle_batch_with_retries(:delete_message_batch, batch) } end |
#receive_messages ⇒ Array<Message>
Receive at most 10 messages from the queue, and return the array of Message objects.
45 46 47 48 49 50 51 |
# File 'lib/sqewer/connection.rb', line 45 def response = client.(queue_url: @queue_url, wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10) response..map do | | Message.new(.receipt_handle, .body) end end |
#send_message(message_body, **kwargs_for_send) ⇒ void
This method returns an undefined value.
Send a message to the backing queue
Passes the arguments to the AWS SDK.
59 60 61 |
# File 'lib/sqewer/connection.rb', line 59 def (, **kwargs_for_send) {|via| via.(, **kwargs_for_send) } end |
#send_multiple_messages {|#send_message| ... } ⇒ void
This method returns an undefined value.
Send multiple messages. If any messages fail to send, an exception will be raised.
99 100 101 102 103 104 |
# File 'lib/sqewer/connection.rb', line 99 def buffer = SendBuffer.new yield(buffer) buffer.each_batch {|batch| handle_batch_with_retries(:send_message_batch, batch) } end |