Class: Mimi::Messaging::SQS_SNS::Adapter

Inherits:
Adapters::Base
  • Object
show all
Defined in:
lib/mimi/messaging/sqs_sns/adapter.rb

Overview

AWS SQS/SNS adapter class

An adapter implementation must implement the following methods:

  • #start()

  • #stop()

  • #command(target, message, opts)

  • #query(target, message, opts)

  • #event(target, message, opts)

  • #start_request_processor(queue_name, processor, opts)

  • #start_event_processor(topic_name, processor, opts)

  • #start_event_processor_with_queue(topic_name, queue_name, processor, opts)

  • #stop_all_processors

Constant Summary collapse

SQS_SNS_ALPHABET_MAP =

NOTE: AWS SQS/SNS alphabet for queue and topic names is different from what mimi-messaging allows: ‘.’ is not an allowed character.

SQS_SNS_ALPHABET_MAP structure is used to convert names from mimi-messaging alphabet to SQS/SNS alphabet.

Mimi::Messaging still accepts queue and topic names containing the ‘.’, but the adapter will convert those to valid SQS/SNS names using this mapping.

{
  "." => "-"
}.freeze
DEFAULT_OPTIONS =
{
  mq_namespace: nil,
  mq_default_query_timeout: 15, # seconds,
  mq_reply_queue_prefix: "reply-",

  # worker pool parameters
  mq_worker_pool_min_threads: 1,
  mq_worker_pool_max_threads: 16,
  mq_worker_pool_max_backlog: 16,

  # if nil, AWS SDK will guess values from environment
  mq_aws_region: nil,
  mq_aws_access_key_id: nil,
  mq_aws_secret_access_key: nil,
  mq_aws_sqs_endpoint: nil,
  mq_aws_sns_endpoint: nil,

  mq_aws_sqs_sns_kms_master_key_id: nil,
  mq_aws_sqs_read_timeout: 20, # seconds
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Adapter

Initializes SQS/SNS adapter

Parameters:

  • options (Hash)

Options Hash (options):

  • :mq_adapter (String)
  • :mq_aws_region (String, nil)
  • :mq_aws_access_key_id (String, nil)
  • :mq_aws_secret_access_key (String, nil)
  • :mq_aws_sqs_endpoint (String, nil)
  • :mq_namespace (String, nil)
  • :mq_reply_queue_prefix (String, nil)
  • :mq_default_query_timeout (Integer, nil)


81
82
83
84
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 81

def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options).dup
  @reply_consumer_mutex = Mutex.new
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



44
45
46
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 44

def options
  @options
end

#sns_clientObject (readonly)

Returns the value of attribute sns_client.



44
45
46
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 44

def sns_client
  @sns_client
end

#sqs_clientObject (readonly)

Returns the value of attribute sqs_client.



44
45
46
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 44

def sqs_client
  @sqs_client
end

#worker_poolObject (readonly)

Returns the value of attribute worker_pool.



44
45
46
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 44

def worker_pool
  @worker_pool
end

Instance Method Details

#command(target, message, _opts = {}) ⇒ Object

Sends the command to the given target

Example:

Mimi::Messaging.command("users/create", name: "John Smith")

Parameters:

  • target (String)

    “<queue>/<method>”

  • message (Hash, Mimi::Messaging::Message)
  • opts (Hash)

    additional adapter-specific options

Returns:

  • nil



124
125
126
127
128
129
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 124

def command(target, message, _opts = {})
  queue_name, method_name = target.split("/")
  message = Mimi::Messaging::Message.new(message, __method: method_name)
  queue_url = find_queue!(queue_name)
  deliver_message_queue(queue_url, message)
end

#create_queue(queue_name) ⇒ String

Creates a new queue

Parameters:

  • queue_name (String)

    name of the topic to be created

Returns:

  • (String)

    a new queue URL



215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 215

def create_queue(queue_name)
  fqn = sqs_sns_converted_full_name(queue_name)
  Mimi::Messaging.log "Creating a queue: #{fqn}"
  attrs = {}
  if options[:mq_aws_sqs_sns_kms_master_key_id]
    attrs["KmsMasterKeyId"] = options[:mq_aws_sqs_sns_kms_master_key_id]
  end
  result = sqs_client.create_queue(queue_name: fqn, attributes: attrs)
  result.queue_url
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError, "Failed to create queue '#{queue_name}': #{e}"
end

#delete_queue(queue_url) ⇒ Object

Deletes a queue identified by the queue URL

Parameters:

  • queue_url (String)


244
245
246
247
248
249
250
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 244

def delete_queue(queue_url)
  Mimi::Messaging.log "Deleting a queue: #{queue_url}"
  sqs_client.delete_queue(queue_url: queue_url)
rescue StandardError => e
  raise Mimi::Messaging::ConnectionError,
    "Failed to delete queue with url '#{queue_url}': #{e}"
end

#event(target, message, _opts = {}) ⇒ Object

Broadcasts the event with the given target

Parameters:

  • target (String)

    “<topic>#<event_type>”, e.g. “customers#created”

  • message (Mimi::Messaging::Message)
  • opts (Hash)

    additional options



164
165
166
167
168
169
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 164

def event(target, message, _opts = {})
  topic_name, event_type = target.split("#")
  message = Mimi::Messaging::Message.new(message, __event_type: event_type)
  topic_arn = find_or_create_topic(topic_name) # TODO: or find_topic!(...) ?
  deliver_message_topic(topic_arn, message)
end

#find_or_create_queue(queue_name) ⇒ String

Finds a queue URL for a queue with given name.

If an existing queue with this name is not found, the method will try to create a new one.

Parameters:

  • queue_name (String)

Returns:

  • (String)

    a queue URL



236
237
238
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 236

def find_or_create_queue(queue_name)
  queue_registry(queue_name) || create_queue(queue_name)
end

#query(target, message, opts = {}) ⇒ Hash

Executes the query to the given target and returns response

Parameters:

  • target (String)

    “<queue>/<method>”

  • message (Hash, Mimi::Messaging::Message)
  • opts (Hash) (defaults to: {})

    additional options, e.g. :timeout

Returns:

  • (Hash)

Raises:

  • (SomeError, Timeout::Error)


140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 140

def query(target, message, opts = {})
  queue_name, method_name = target.split("/")
  queue_url = find_queue!(queue_name)
  request_id = SecureRandom.hex(8)
  reply_queue = reply_consumer.register_request_id(request_id)

  message = Mimi::Messaging::Message.new(
    message,
    __method: method_name,
    __reply_queue_url: reply_consumer.reply_queue_url,
    __request_id: request_id
  )
  deliver_message_queue(queue_url, message)
  timeout = opts[:timeout] || options[:mq_default_query_timeout]
  response = reply_queue.pop(true, timeout)
  deserialize(response.body)
end

#startObject



86
87
88
89
90
91
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 86

def start
  @sqs_client = Aws::SQS::Client.new(sqs_client_config)
  @sns_client = Aws::SNS::Client.new(sns_client_config)
  start_worker_pool!
  check_availability!
end

#start_event_processor(topic_name, processor, opts = {}) ⇒ Object



193
194
195
196
197
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 193

def start_event_processor(topic_name, processor, opts = {})
  # NOTE: due to SQS/SNS limitations, implementing this will
  # require creating a temporary queue and subscribing it to the topic
  raise "Not implemented"
end

#start_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) ⇒ Object



199
200
201
202
203
204
205
206
207
208
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 199

def start_event_processor_with_queue(topic_name, queue_name, processor, opts = {})
  @consumers ||= []
  opts = opts.dup
  topic_arn = find_or_create_topic(topic_name) # TODO: or find_topic!(...) ?
  queue_url = find_or_create_queue(queue_name)
  subscribe_topic_queue(topic_arn, queue_url)
  @consumers << Consumer.new(self, queue_url, worker_pool) do |m|
    process_event_message(processor, m)
  end
end

#start_request_processor(queue_name, processor, opts = {}) ⇒ Object

Starts a request (command/query) processor.

Processor must respond to #call_command() AND #call_query() which accepts 3 arguments: (method, message, opts).

If the processor raises an error, the message will be NACK-ed and accepted again at a later time.

Parameters:

  • queue_name (String)

    “<queue>”

  • processor (#call_command(), #call_query())
  • opts (Hash) (defaults to: {})

    additional adapter-specific options



183
184
185
186
187
188
189
190
191
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 183

def start_request_processor(queue_name, processor, opts = {})
  super
  @consumers ||= []
  opts = opts.dup
  queue_url = find_or_create_queue(queue_name)
  @consumers << Consumer.new(self, queue_url, worker_pool) do |m|
    process_request_message(processor, m)
  end
end

#stopObject



93
94
95
96
97
98
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 93

def stop
  stop_all_processors
  stop_worker_pool!
  @sqs_client = nil
  @sns_client = nil
end

#stop_all_processorsObject

Stops all message (command, query and event) processors.

Stops currently registered processors and stops accepting new messages for processors.



105
106
107
108
109
110
111
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 105

def stop_all_processors
  @consumers&.each(&:signal_stop)
  @consumers&.each(&:stop)
  @consumers = nil
  @reply_consumer&.stop
  @reply_consumer = nil
end