Class: Mimi::Messaging::SQS_SNS::Adapter
- Inherits:
-
Adapters::Base
- Object
- Adapters::Base
- Mimi::Messaging::SQS_SNS::Adapter
- Defined in:
- lib/mimi/messaging/sqs_sns/adapter.rb
Overview
AWS SQS/SNS adapter class
An adapter implementation must implement the following methods:
-
#start()
-
#stop()
-
#command(target, message, opts)
-
#query(target, message, opts)
-
#event(target, message, opts)
-
#start_request_processor(queue_name, processor, opts)
-
#start_event_processor(topic_name, processor, opts)
-
#start_event_processor_with_queue(topic_name, queue_name, processor, opts)
-
#stop_all_processors
Constant Summary collapse
- SQS_SNS_ALPHABET_MAP =
NOTE: AWS SQS/SNS alphabet for queue and topic names is different from what mimi-messaging allows: ‘.’ is not an allowed character.
SQS_SNS_ALPHABET_MAP structure is used to convert names from mimi-messaging alphabet to SQS/SNS alphabet.
Mimi::Messaging still accepts queue and topic names containing the ‘.’, but the adapter will convert those to valid SQS/SNS names using this mapping.
{ "." => "-" }.freeze
- DEFAULT_OPTIONS =
{ mq_namespace: nil, mq_default_query_timeout: 15, # seconds, mq_reply_queue_prefix: "reply-", # worker pool parameters mq_worker_pool_min_threads: 1, mq_worker_pool_max_threads: 16, mq_worker_pool_max_backlog: 16, # if nil, AWS SDK will guess values from environment mq_aws_region: nil, mq_aws_access_key_id: nil, mq_aws_secret_access_key: nil, mq_aws_sqs_endpoint: nil, mq_aws_sns_endpoint: nil, mq_aws_sqs_sns_kms_master_key_id: nil, mq_aws_sqs_read_timeout: 20, # seconds }.freeze
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#sns_client ⇒ Object
readonly
Returns the value of attribute sns_client.
-
#sqs_client ⇒ Object
readonly
Returns the value of attribute sqs_client.
-
#worker_pool ⇒ Object
readonly
Returns the value of attribute worker_pool.
Instance Method Summary collapse
-
#command(target, message, _opts = {}) ⇒ Object
Sends the command to the given target.
-
#create_queue(queue_name) ⇒ String
Creates a new queue.
-
#delete_queue(queue_url) ⇒ Object
Deletes a queue identified by the queue URL.
-
#event(target, message, _opts = {}) ⇒ Object
Broadcasts the event with the given target.
-
#find_or_create_queue(queue_name) ⇒ String
Finds a queue URL for a queue with given name.
-
#initialize(options) ⇒ Adapter
constructor
Initializes SQS/SNS adapter.
-
#query(target, message, opts = {}) ⇒ Hash
Executes the query to the given target and returns response.
- #start ⇒ Object
- #start_event_processor(topic_name, processor, opts = {}) ⇒ Object
- #start_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) ⇒ Object
-
#start_request_processor(queue_name, processor, opts = {}) ⇒ Object
Starts a request (command/query) processor.
- #stop ⇒ Object
-
#stop_all_processors ⇒ Object
Stops all message (command, query and event) processors.
Constructor Details
#initialize(options) ⇒ Adapter
Initializes SQS/SNS adapter
81 82 83 84 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 81 def initialize() @options = DEFAULT_OPTIONS.merge().dup @reply_consumer_mutex = Mutex.new end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
44 45 46 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 44 def @options end |
#sns_client ⇒ Object (readonly)
Returns the value of attribute sns_client.
44 45 46 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 44 def sns_client @sns_client end |
#sqs_client ⇒ Object (readonly)
Returns the value of attribute sqs_client.
44 45 46 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 44 def sqs_client @sqs_client end |
#worker_pool ⇒ Object (readonly)
Returns the value of attribute worker_pool.
44 45 46 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 44 def worker_pool @worker_pool end |
Instance Method Details
#command(target, message, _opts = {}) ⇒ Object
124 125 126 127 128 129 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 124 def command(target, , _opts = {}) queue_name, method_name = target.split("/") = Mimi::Messaging::Message.new(, __method: method_name) queue_url = find_queue!(queue_name) (queue_url, ) end |
#create_queue(queue_name) ⇒ String
Creates a new queue
215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 215 def create_queue(queue_name) fqn = sqs_sns_converted_full_name(queue_name) Mimi::Messaging.log "Creating a queue: #{fqn}" attrs = {} if [:mq_aws_sqs_sns_kms_master_key_id] attrs["KmsMasterKeyId"] = [:mq_aws_sqs_sns_kms_master_key_id] end result = sqs_client.create_queue(queue_name: fqn, attributes: attrs) result.queue_url rescue StandardError => e raise Mimi::Messaging::ConnectionError, "Failed to create queue '#{queue_name}': #{e}" end |
#delete_queue(queue_url) ⇒ Object
Deletes a queue identified by the queue URL
244 245 246 247 248 249 250 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 244 def delete_queue(queue_url) Mimi::Messaging.log "Deleting a queue: #{queue_url}" sqs_client.delete_queue(queue_url: queue_url) rescue StandardError => e raise Mimi::Messaging::ConnectionError, "Failed to delete queue with url '#{queue_url}': #{e}" end |
#event(target, message, _opts = {}) ⇒ Object
Broadcasts the event with the given target
164 165 166 167 168 169 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 164 def event(target, , _opts = {}) topic_name, event_type = target.split("#") = Mimi::Messaging::Message.new(, __event_type: event_type) topic_arn = find_or_create_topic(topic_name) # TODO: or find_topic!(...) ? (topic_arn, ) end |
#find_or_create_queue(queue_name) ⇒ String
Finds a queue URL for a queue with given name.
If an existing queue with this name is not found, the method will try to create a new one.
236 237 238 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 236 def find_or_create_queue(queue_name) queue_registry(queue_name) || create_queue(queue_name) end |
#query(target, message, opts = {}) ⇒ Hash
Executes the query to the given target and returns response
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 140 def query(target, , opts = {}) queue_name, method_name = target.split("/") queue_url = find_queue!(queue_name) request_id = SecureRandom.hex(8) reply_queue = reply_consumer.register_request_id(request_id) = Mimi::Messaging::Message.new( , __method: method_name, __reply_queue_url: reply_consumer.reply_queue_url, __request_id: request_id ) (queue_url, ) timeout = opts[:timeout] || [:mq_default_query_timeout] response = reply_queue.pop(true, timeout) deserialize(response.body) end |
#start ⇒ Object
86 87 88 89 90 91 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 86 def start @sqs_client = Aws::SQS::Client.new(sqs_client_config) @sns_client = Aws::SNS::Client.new(sns_client_config) start_worker_pool! check_availability! end |
#start_event_processor(topic_name, processor, opts = {}) ⇒ Object
193 194 195 196 197 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 193 def start_event_processor(topic_name, processor, opts = {}) # NOTE: due to SQS/SNS limitations, implementing this will # require creating a temporary queue and subscribing it to the topic raise "Not implemented" end |
#start_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) ⇒ Object
199 200 201 202 203 204 205 206 207 208 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 199 def start_event_processor_with_queue(topic_name, queue_name, processor, opts = {}) @consumers ||= [] opts = opts.dup topic_arn = find_or_create_topic(topic_name) # TODO: or find_topic!(...) ? queue_url = find_or_create_queue(queue_name) subscribe_topic_queue(topic_arn, queue_url) @consumers << Consumer.new(self, queue_url, worker_pool) do |m| (processor, m) end end |
#start_request_processor(queue_name, processor, opts = {}) ⇒ Object
Starts a request (command/query) processor.
Processor must respond to #call_command() AND #call_query() which accepts 3 arguments: (method, message, opts).
If the processor raises an error, the message will be NACK-ed and accepted again at a later time.
183 184 185 186 187 188 189 190 191 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 183 def start_request_processor(queue_name, processor, opts = {}) super @consumers ||= [] opts = opts.dup queue_url = find_or_create_queue(queue_name) @consumers << Consumer.new(self, queue_url, worker_pool) do |m| (processor, m) end end |
#stop ⇒ Object
93 94 95 96 97 98 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 93 def stop stop_all_processors stop_worker_pool! @sqs_client = nil @sns_client = nil end |
#stop_all_processors ⇒ Object
Stops all message (command, query and event) processors.
Stops currently registered processors and stops accepting new messages for processors.
105 106 107 108 109 110 111 |
# File 'lib/mimi/messaging/sqs_sns/adapter.rb', line 105 def stop_all_processors @consumers&.each(&:signal_stop) @consumers&.each(&:stop) @consumers = nil @reply_consumer&.stop @reply_consumer = nil end |