Class: Sqewer::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/sqewer/connection.rb

Overview

Adapter that handles communication with a specific queue. In the future this could be switched to a Google PubSub queue, or to AMQP, or to any other queue with guaranteed re-delivery without ACK. The required queue semantics are very simple:

  • no message should be deleted if the receiving client has not deleted it explicitly
  • any execution that ends with an exception should cause the message to be re-enqueued

Defined Under Namespace

Classes: DeleteBuffer, Message, MessageBuffer, RetryWrapper, SendBuffer

Constant Summary collapse

DEFAULT_TIMEOUT_SECONDS =
5
BATCH_RECEIVE_SIZE =
10
MAX_RANDOM_FAILURES_PER_CALL =
10
NotOurFaultAwsError =
Class.new(StandardError)

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue_url) ⇒ Connection

Initializes a new adapter, with access to the SQS queue at the given URL.

Parameters:

  • queue_url (String)

    the SQS queue URL (the URL can be copied from your AWS console)



37
38
39
40
# File 'lib/sqewer/connection.rb', line 37

def initialize(queue_url)
  require 'aws-sdk'
  @queue_url = queue_url
end

Class Method Details

.defaultObject

Returns the default adapter, connected to the queue set via the SQS_QUEUE_URL environment variable.



28
29
30
31
32
# File 'lib/sqewer/connection.rb', line 28

def self.default
  new(ENV.fetch('SQS_QUEUE_URL'))
rescue KeyError => e
  raise "SQS_QUEUE_URL not set in the environment. This is the queue URL that the default that Sqewer uses"
end

Instance Method Details

#delete_message(message_identifier) ⇒ void

This method returns an undefined value.

Deletes a message after it has been succesfully decoded and processed

Parameters:

  • message_identifier (String)

    the ID of the message to delete. For SQS, it is the receipt handle



110
111
112
# File 'lib/sqewer/connection.rb', line 110

def delete_message(message_identifier)
  delete_multiple_messages {|via| via.delete_message(message_identifier) }
end

#delete_multiple_messages {|#delete_message| ... } ⇒ void

This method returns an undefined value.

Deletes multiple messages after they all have been succesfully decoded and processed.

Yields:

  • (#delete_message)

    an object you can delete an individual message through



118
119
120
121
122
123
# File 'lib/sqewer/connection.rb', line 118

def delete_multiple_messages
  buffer = DeleteBuffer.new
  yield(buffer)

  buffer.each_batch {|batch| handle_batch_with_retries(:delete_message_batch, batch) }
end

#receive_messagesArray<Message>

Receive at most 10 messages from the queue, and return the array of Message objects.

Returns:

  • (Array<Message>)

    an array of Message objects



45
46
47
48
49
50
51
# File 'lib/sqewer/connection.rb', line 45

def receive_messages
  response = client.receive_message(queue_url: @queue_url,
    wait_time_seconds: DEFAULT_TIMEOUT_SECONDS, max_number_of_messages: 10)
  response.messages.map do | message |
    Message.new(message.receipt_handle, message.body)
  end
end

#send_message(message_body, **kwargs_for_send) ⇒ void

This method returns an undefined value.

Send a message to the backing queue

Passes the arguments to the AWS SDK.

Parameters:

  • message_body (String)

    the message to send

  • kwargs_for_send (Hash)

    additional arguments for the submit (such as delay_seconds).



59
60
61
# File 'lib/sqewer/connection.rb', line 59

def send_message(message_body, **kwargs_for_send)
  send_multiple_messages {|via| via.send_message(message_body, **kwargs_for_send) }
end

#send_multiple_messages {|#send_message| ... } ⇒ void

This method returns an undefined value.

Send multiple messages. If any messages fail to send, an exception will be raised.

Yields:

  • (#send_message)

    the object you can send messages through (will be flushed at method return)



99
100
101
102
103
104
# File 'lib/sqewer/connection.rb', line 99

def send_multiple_messages
  buffer = SendBuffer.new
  yield(buffer)

  buffer.each_batch {|batch| handle_batch_with_retries(:send_message_batch, batch) }
end