Class: Sqewer::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/sqewer/connection.rb

Overview

Adapter that handles communication with a specific queue. In the future this could be switched to a Google PubSub queue, or to AMQP, or to any other queue with guaranteed re-delivery without ACK. The required queue semantics are very simple:

  • no message should be deleted if the receiving client has not deleted it explicitly
  • any execution that ends with an exception should cause the message to be re-enqueued

Direct Known Subclasses

LocalConnection

Defined Under Namespace

Classes: DeleteBuffer, Message, MessageBuffer, SendBuffer

Constant Summary collapse

DEFAULT_TIMEOUT_SECONDS =
5
BATCH_RECEIVE_SIZE =
10
MAX_RANDOM_FAILURES_PER_CALL =
10
MAX_RANDOM_RECEIVE_FAILURES =

sure to hit the max_elapsed_time of 900 seconds

100
NotOurFaultAwsError =
Class.new(Sqewer::Error)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_url, client: self.class.client) ⇒ Connection

Initializes a new adapter, with access to the SQS queue at the given URL.

Parameters:

  • queue_url (String)

    the SQS queue URL (the URL can be copied from your AWS console)



58
59
60
61
# File 'lib/sqewer/connection.rb', line 58

def initialize(queue_url, client: self.class.client)
  @queue_url = queue_url
  @client = client
end

Class Method Details

.clientObject

Returns a singleton of Aws::SQS::Client



43
44
45
46
47
48
49
50
51
52
53
# File 'lib/sqewer/connection.rb', line 43

def self.client
  # It's better using a singleton client to prevent making a lot of HTTP
  # requests to the AWS metadata endpoint when getting credentials.
  @client ||= begin
    require 'aws-sdk-sqs'
    ::Aws::SQS::Client.new(
      instance_profile_credentials_timeout: 1,
      instance_profile_credentials_retries: 5,
    )
  end
end

.defaultObject

Returns the default adapter, connected to the queue set via the SQS_QUEUE_URL environment variable. Switches to SQLite-backed local queue if the SQS_QUEUE_URL is prefixed with 'sqlite3://'



30
31
32
33
34
35
36
37
38
39
40
# File 'lib/sqewer/connection.rb', line 30

def self.default
  url_str = ENV.fetch('SQS_QUEUE_URL')
  uri = URI(url_str)
  if uri.scheme == 'sqlite3'
    Sqewer::LocalConnection.new(uri.to_s)
  else
    new(uri.to_s)
  end
rescue KeyError => e
  raise "SQS_QUEUE_URL not set in the environment. This is the queue URL Sqewer uses by default."
end

Instance Method Details

#delete_message(message_identifier) ⇒ void

This method returns an undefined value.

Deletes a message after it has been succesfully decoded and processed

Parameters:

  • message_identifier (String)

    the ID of the message to delete. For SQS, it is the receipt handle



198
199
200
# File 'lib/sqewer/connection.rb', line 198

def delete_message(message_identifier)
  delete_multiple_messages {|via| via.delete_message(message_identifier) }
end

#delete_multiple_messages {|#delete_message| ... } ⇒ void

This method returns an undefined value.

Deletes multiple messages after they all have been succesfully decoded and processed.

Yields:

  • (#delete_message)

    an object you can delete an individual message through



206
207
208
209
210
211
# File 'lib/sqewer/connection.rb', line 206

def delete_multiple_messages
  buffer = DeleteBuffer.new
  yield(buffer)

  buffer.each_batch {|batch| handle_batch_with_retries(:delete_message_batch, batch) }
end

#receive_messagesArray<Message>

Receive at most 10 messages from the queue, and return the array of Message objects. Retries for at most 900 seconds (15 minutes) and then gives up, thereby crashing the read loop. If SQS is not available even after 15 minutes it is either down or the server is misconfigured. Either way it makes no sense to continue.

Returns:

  • (Array<Message>)

    an array of Message objects



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/sqewer/connection.rb', line 69

def receive_messages
  Retriable.retriable on: network_and_aws_sdk_errors, tries: MAX_RANDOM_RECEIVE_FAILURES do
    response = @client.receive_message(
      queue_url: @queue_url,
      attribute_names: ['All'],
      wait_time_seconds: DEFAULT_TIMEOUT_SECONDS,
      max_number_of_messages: BATCH_RECEIVE_SIZE
    )
    response.messages.map {|message| Message.new(message.receipt_handle, message.body, message.attributes) }
  end
rescue *sqs_errors_to_release_client
  # We noticed cases where errors related to AWS credentials started to happen suddenly.
  # We don't know the root cause yet, but what we can do is release the
  # singleton @client instance because it contains a cache of credentials that in most
  # cases is no longer valid.
  self.class.release_client

  raise
end

#send_message(message_body, **kwargs_for_send) ⇒ void

This method returns an undefined value.

Send a message to the backing queue

Passes the arguments to the AWS SDK.

Parameters:

  • message_body (String)

    the message to send

  • kwargs_for_send (Hash)

    additional arguments for the submit (such as delay_seconds).



95
96
97
# File 'lib/sqewer/connection.rb', line 95

def send_message(message_body, **kwargs_for_send)
  send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) }
end

#send_multiple_messages {|#send_message| ... } ⇒ void

This method returns an undefined value.

Send multiple messages. If any messages fail to send, an exception will be raised.

Yields:

  • (#send_message)

    the object you can send messages through (will be flushed at method return)



187
188
189
190
191
192
# File 'lib/sqewer/connection.rb', line 187

def send_multiple_messages
  buffer = SendBuffer.new
  yield(buffer)

  buffer.each_batch {|batch| handle_batch_with_retries(:send_message_batch, batch) }
end