Class: ActiveMessaging::Adapters::Sqs::Connection
- Inherits:
-
BaseConnection
- Object
- BaseConnection
- ActiveMessaging::Adapters::Sqs::Connection
- Defined in:
- lib/activemessaging/adapters/sqs.rb
Constant Summary collapse
- QUEUE_NAME_LENGTH =
1..80
- VISIBILITY_TIMEOUT =
0..(24 * 60 * 60)
- NUMBER_OF_MESSAGES =
1..255
- GET_QUEUE_ATTRIBUTES =
['All', 'ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesDelayed', 'ApproximateNumberOfMessagesNotVisible', 'CreatedTimestamp', 'DelaySeconds', 'LastModifiedTimestamp', 'MaximumMessageSize', 'MessageRetentionPeriod', 'Policy', 'QueueArn', 'ReceiveMessageWaitTimeSeconds', 'RedrivePolicy', 'VisibilityTimeout', 'KmsMasterKeyId', 'KmsDataKeyReusePeriodSeconds', 'FifoQueue', 'ContentBasedDeduplication'].freeze
- SET_QUEUE_ATTRIBUTES =
['DelaySeconds', 'MaximumMessageSize', 'MessageRetentionPeriod', 'Policy', 'ReceiveMessageWaitTimeSeconds', 'RedrivePolicy', 'VisibilityTimeout', 'KmsMasterKeyId', 'KmsDataKeyReusePeriodSeconds', 'ContentBasedDeduplication'].freeze
- URI_ENCODING_REPLACEMENTS =
{ '%7E' => '~', '+' => '%20' }.freeze
Instance Attribute Summary collapse
-
#access_key_id ⇒ Object
configurable params.
-
#aws_version ⇒ Object
configurable params.
-
#cache_queue_list ⇒ Object
configurable params.
-
#content_type ⇒ Object
configurable params.
-
#host ⇒ Object
configurable params.
-
#max_message_size ⇒ Object
configurable params.
-
#poll_interval ⇒ Object
configurable params.
-
#port ⇒ Object
configurable params.
-
#reconnect_delay ⇒ Object
configurable params.
-
#secret_access_key ⇒ Object
configurable params.
Attributes inherited from BaseConnection
Instance Method Summary collapse
-
#add_queue(url) ⇒ Object
internal data structure methods.
- #check_errors(response) ⇒ Object
- #create_queue(name) ⇒ Object
- #delete_message(message) ⇒ Object
- #delete_queue(queue) ⇒ Object
- #disconnect ⇒ Object
- #get_or_create_queue(queue_name) ⇒ Object
- #get_queue_attributes(queue, attributes = ['All']) ⇒ Object
- #http_request(h, p, r) ⇒ Object
-
#initialize(cfg) ⇒ Connection
constructor
generic init method needed by a13g.
- #list_queues(queue_name_prefix = nil) ⇒ Object
- #make_request(action, url = nil, params = {}, attributes = {}) ⇒ Object
- #message_size_range ⇒ Object
- #queues ⇒ Object
-
#receive(options = {}) ⇒ Object
new receive respects priorities.
- #received(message, headers = {}) ⇒ Object
- #retrieve_messsages(queue, num_messages = 1, timeout = nil, waittime = nil) ⇒ Object
-
#send(queue_name, message_body, message_headers = {}) ⇒ Object
queue_name string, body string, headers hash send a single message to a queue.
- #send_messsage(queue, message) ⇒ Object
- #set_queue_attributes(queue, attributes) ⇒ Object
-
#subscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues.
-
#unreceive(message, headers = {}) ⇒ Object
do nothing; by not deleting the message will eventually become visible again.
-
#unsubscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok.
- #url_encode(param) ⇒ Object
- #validate_get_queue_attribute(qa) ⇒ Object
- #validate_message(m) ⇒ Object
- #validate_new_queue(qn) ⇒ Object
- #validate_number_of_messages(nom) ⇒ Object
- #validate_queue(q) ⇒ Object
-
#validate_queue_name(qn) ⇒ Object
validation methods.
- #validate_set_queue_attribute(qa) ⇒ Object
- #validate_timeout(to) ⇒ Object
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg) ⇒ Connection
generic init method needed by a13g
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/activemessaging/adapters/sqs.rb', line 32 def initialize cfg raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?) raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?) @access_key_id = cfg[:access_key_id] @secret_access_key = cfg[:secret_access_key] @region = cfg[:region] || 'us-east-1' @request_expires = cfg[:requestExpires] || 10 @request_retry_count = cfg[:requestRetryCount] || 5 @aws_version = cfg[:aws_version] || '2012-11-05' @content_type = cfg[:content_type] || 'text/plain' @host = cfg[:host] || "sqs.#{@region}.amazonaws.com" @port = cfg[:port] || 80 @protocol = cfg[:protocol] || 'http' @poll_interval = cfg[:poll_interval] || 1 @reconnect_delay = cfg[:reconnectDelay] || 5 @max_message_size = cfg[:max_message_size].to_i > 0 ? cfg[:max_message_size].to_i : 8 @aws_url = "#{@protocol}://#{@host}/" @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list] @reliable = cfg[:reliable].nil? ? true : cfg[:reliable] #initialize the subscriptions and queues @subscriptions = {} @queues_by_priority = {} @current_subscription = 0 queues end |
Instance Attribute Details
#access_key_id ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def access_key_id @access_key_id end |
#aws_version ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def aws_version @aws_version end |
#cache_queue_list ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def cache_queue_list @cache_queue_list end |
#content_type ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def content_type @content_type end |
#host ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def host @host end |
#max_message_size ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def @max_message_size end |
#poll_interval ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def poll_interval @poll_interval end |
#port ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def port @port end |
#reconnect_delay ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def reconnect_delay @reconnect_delay end |
#secret_access_key ⇒ Object
configurable params
29 30 31 |
# File 'lib/activemessaging/adapters/sqs.rb', line 29 def secret_access_key @secret_access_key end |
Instance Method Details
#add_queue(url) ⇒ Object
internal data structure methods
319 320 321 322 323 |
# File 'lib/activemessaging/adapters/sqs.rb', line 319 def add_queue(url) q = Queue.from_url url queues[q.name] = q if self.cache_queue_list return q end |
#check_errors(response) ⇒ Object
304 305 306 307 308 |
# File 'lib/activemessaging/adapters/sqs.rb', line 304 def check_errors(response) raise 'http response was nil' if (response.nil?) raise response.errors if (response && response.errors?) response end |
#create_queue(name) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/activemessaging/adapters/sqs.rb', line 148 def create_queue(name) validate_new_queue name response = make_request('CreateQueue', nil, { 'QueueName' => name }, { 'DelaySeconds' => 0, 'MaximumMessageSize' => 262144, 'MessageRetentionPeriod' => 4 * 24 * 60 * 60, 'ReceiveMessageWaitTimeSeconds' => 0, 'VisibilityTimeout' => 90 * 60 }) add_queue(response.get_text("/CreateQueueResponse/CreateQueueResult/QueueUrl")) unless response.nil? end |
#delete_message(message) ⇒ Object
221 222 223 |
# File 'lib/activemessaging/adapters/sqs.rb', line 221 def response = make_request('DeleteMessage', .queue.queue_url, { 'ReceiptHandle' => .receipt_handle }) end |
#delete_queue(queue) ⇒ Object
160 161 162 163 164 |
# File 'lib/activemessaging/adapters/sqs.rb', line 160 def delete_queue(queue) validate_queue queue response = make_request('DeleteQueue', queue.queue_url) queues.delete(queue.name) end |
#disconnect ⇒ Object
63 64 65 |
# File 'lib/activemessaging/adapters/sqs.rb', line 63 def disconnect return true end |
#get_or_create_queue(queue_name) ⇒ Object
325 326 327 328 329 330 |
# File 'lib/activemessaging/adapters/sqs.rb', line 325 def get_or_create_queue queue_name qs = queues q = qs.has_key?(queue_name) ? qs[queue_name] : create_queue(queue_name) raise "could not get or create queue: #{queue_name}" unless q q end |
#get_queue_attributes(queue, attributes = ['All']) ⇒ Object
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/activemessaging/adapters/sqs.rb', line 173 def get_queue_attributes(queue, attributes = ['All']) params = {} attributes.each_with_index do |attribute, i| validate_get_queue_attribute(attribute) params["AttributeName.#{i+1}"] = attribute end response = make_request('GetQueueAttributes', queue.queue_url, params) attributes = {} response.each_node('/GetQueueAttributesResponse/GetQueueAttributesResult/Attribute') { |n| name = n.elements['Name'].text value = n.elements['Value'].text attributes[name] = value } attributes end |
#http_request(h, p, r) ⇒ Object
293 294 295 296 297 298 299 300 301 302 |
# File 'lib/activemessaging/adapters/sqs.rb', line 293 def http_request h, p, r http = Net::HTTP.new(h, p) # http.set_debug_output(STDOUT) http.use_ssl = 'https' == @protocol # Don't carp about SSL cert verification http.verify_mode = OpenSSL::SSL::VERIFY_NONE return http.request(r) end |
#list_queues(queue_name_prefix = nil) ⇒ Object
166 167 168 169 170 171 |
# File 'lib/activemessaging/adapters/sqs.rb', line 166 def list_queues(queue_name_prefix = nil) validate_queue_name queue_name_prefix unless queue_name_prefix.nil? params = queue_name_prefix.nil? ? {} : { "QueueNamePrefix" => queue_name_prefix } response = make_request('ListQueues', nil, params) response.nil? ? [] : response.nodes("/ListQueuesResponse/ListQueuesResult/QueueUrl").collect{ |n| add_queue(n.text) } end |
#make_request(action, url = nil, params = {}, attributes = {}) ⇒ Object
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/activemessaging/adapters/sqs.rb', line 225 def make_request(action, url=nil, params = {}, attributes = {}) url ||= @aws_url params['Action'] = action params['Version'] = @aws_version params['Expires']= (Time.now + @request_expires).utc.iso8601 attributes.keys.sort.each_with_index do |k, i| params["Attributes.#{i + 1}.Name"] = k params["Attributes.#{i + 1}.Value"] = attributes[k] end # Sort and encode query params query_params = params.keys.sort.map do |key| key + "=" + url_encode(params[key]) end # Put these together with the uri to get the request query string request_url = "#{url}?#{query_params.join("&")}" # Create the request init_headers = { 'Date' => Time.now.utc.iso8601, 'Host' => @host } request = Net::HTTP::Get.new(request_url, init_headers) # Sign the request signer = AWS4Signer.new({ :access_key => @access_key_id, :secret_key => @secret_access_key, :region => @region }) headers = {} request.canonical_each { |k, v| headers[k] = v } signature = signer.sign('GET', URI.parse(request_url), headers, nil, false) signature.each { |k, v| request[k] = v } # Make the request retry_count = 0 while retry_count < @request_retry_count.to_i retry_count = retry_count + 1 begin http_response = http_request(host,port,request) response = SQSResponse.new(http_response) check_errors(response) return response rescue Object=>ex raise ex unless reliable sleep(@reconnect_delay) end end end |
#message_size_range ⇒ Object
352 353 354 |
# File 'lib/activemessaging/adapters/sqs.rb', line 352 def @_message_size_range ||= 1..( * 1024) end |
#queues ⇒ Object
310 311 312 313 314 315 |
# File 'lib/activemessaging/adapters/sqs.rb', line 310 def queues return @queues if (@queues && cache_queue_list) @queues = {} list_queues.each { |q| @queues[q.name] = q } return @queues end |
#receive(options = {}) ⇒ Object
new receive respects priorities
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/activemessaging/adapters/sqs.rb', line 103 def receive( = {}) = nil only_priorities = [:priorities] # loop through the priorities @queues_by_priority.keys.sort.each do |priority| # skip this priority if there is a list, and it is not in the list next if only_priorities && !only_priorities.include?(priority.to_i) # loop through queues for the same priority in random order each time @queues_by_priority[priority].shuffle.each do |queue_name| queue = queues[queue_name] subscription = @subscriptions[queue_name] next if queue.nil? || subscription.nil? = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout]) if ( && !.empty?) = [0] end break if end break if end end |
#received(message, headers = {}) ⇒ Object
134 135 136 137 138 139 140 141 |
# File 'lib/activemessaging/adapters/sqs.rb', line 134 def received(, headers={}) begin () rescue Object => exception logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: " logger.error exception end end |
#retrieve_messsages(queue, num_messages = 1, timeout = nil, waittime = nil) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/activemessaging/adapters/sqs.rb', line 206 def retrieve_messsages(queue, = 1, timeout = nil, waittime = nil) validate_queue queue validate_timeout timeout if timeout params = { 'MaxNumberOfMessages' => .to_s, 'AttributeName' => 'All' } params['VisibilityTimeout'] = timeout.to_s if timeout params['WaitTimeSeconds'] = waittime.to_s if waittime response = make_request('ReceiveMessage', queue.queue_url, params) response.nodes('/ReceiveMessageResponse/ReceiveMessageResult/Message').map do |n| Message.from_element(n, response, queue) end unless response.nil? end |
#send(queue_name, message_body, message_headers = {}) ⇒ Object
queue_name string, body string, headers hash send a single message to a queue
97 98 99 100 |
# File 'lib/activemessaging/adapters/sqs.rb', line 97 def send(queue_name, , = {}) queue = get_or_create_queue(queue_name) send_messsage queue, end |
#send_messsage(queue, message) ⇒ Object
199 200 201 202 203 204 |
# File 'lib/activemessaging/adapters/sqs.rb', line 199 def send_messsage(queue, ) validate_queue queue response = make_request('SendMessage', queue.queue_url, { 'MessageBody' => }) response.get_text('/SendMessageResponse/SendMessageResult/MessageId') unless response.nil? end |
#set_queue_attributes(queue, attributes) ⇒ Object
189 190 191 192 |
# File 'lib/activemessaging/adapters/sqs.rb', line 189 def set_queue_attributes(queue, attributes) attributes.keys.each { |a| validate_set_queue_attribute(a) } response = make_request('SetQueueAttributes', queue.queue_url, {}, attributes) end |
#subscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues
69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/activemessaging/adapters/sqs.rb', line 69 def subscribe queue_name, ={} # look at the existing queues, create any that are missing queue = get_or_create_queue queue_name if @subscriptions.has_key? queue.name @subscriptions[queue.name].add else @subscriptions[queue.name] = Subscription.new(queue.name, ) end priority = @subscriptions[queue.name].priority @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority) @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name) end |
#unreceive(message, headers = {}) ⇒ Object
do nothing; by not deleting the message will eventually become visible again
144 145 146 |
# File 'lib/activemessaging/adapters/sqs.rb', line 144 def unreceive(, headers = {}) return true end |
#unsubscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok
85 86 87 88 89 90 91 92 93 |
# File 'lib/activemessaging/adapters/sqs.rb', line 85 def unsubscribe queue_name, ={} if @subscriptions[queue_name] @subscriptions[queue_name].remove if @subscriptions[queue_name].count <= 0 sub = @subscriptions.delete(queue_name) @queues_by_priority[sub.priority].delete(queue_name) end end end |
#url_encode(param) ⇒ Object
281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/activemessaging/adapters/sqs.rb', line 281 def url_encode(param) param = param.to_s if param.respond_to?(:encode) param = param.encode('UTF-8') end param = CGI::escape(param) URI_ENCODING_REPLACEMENTS.each { |k, v| param = param.gsub(k, v) } param end |
#validate_get_queue_attribute(qa) ⇒ Object
360 361 362 |
# File 'lib/activemessaging/adapters/sqs.rb', line 360 def validate_get_queue_attribute qa raise "Queue Attribute name, #{qa}, not in list of valid attributes to get: #{GET_QUEUE_ATTRIBUTES.to_sentence}." unless GET_QUEUE_ATTRIBUTES.include?(qa) end |
#validate_message(m) ⇒ Object
347 348 349 350 |
# File 'lib/activemessaging/adapters/sqs.rb', line 347 def m raise "Message cannot be nil." if m.nil? raise "Message length, #{m.length}, must be between #{.min} and #{.max}." unless .include?(m.length) end |
#validate_new_queue(qn) ⇒ Object
338 339 340 341 |
# File 'lib/activemessaging/adapters/sqs.rb', line 338 def validate_new_queue qn validate_queue_name qn raise "Queue already exists: #{qn}" if queues.has_key? qn end |
#validate_number_of_messages(nom) ⇒ Object
368 369 370 |
# File 'lib/activemessaging/adapters/sqs.rb', line 368 def nom raise "Number of messages, #{nom}, must be between #{NUMBER_OF_MESSAGES.min} and #{NUMBER_OF_MESSAGES.max}." unless NUMBER_OF_MESSAGES.include?(nom) end |
#validate_queue(q) ⇒ Object
343 344 345 |
# File 'lib/activemessaging/adapters/sqs.rb', line 343 def validate_queue q raise "Never heard of queue, can't use it: #{q.name}" unless queues.has_key? q.name end |
#validate_queue_name(qn) ⇒ Object
validation methods
333 334 335 336 |
# File 'lib/activemessaging/adapters/sqs.rb', line 333 def validate_queue_name qn raise "Queue name, '#{qn}', must be between #{QUEUE_NAME_LENGTH.min} and #{QUEUE_NAME_LENGTH.max} characters." unless QUEUE_NAME_LENGTH.include?(qn.length) raise "Queue name, '#{qn}', must be alphanumeric only." if (qn =~ /[^\w\-\_]/ ) end |
#validate_set_queue_attribute(qa) ⇒ Object
364 365 366 |
# File 'lib/activemessaging/adapters/sqs.rb', line 364 def validate_set_queue_attribute qa raise "Queue Attribute name, #{qa}, not in list of valid attributes to set: #{SET_QUEUE_ATTRIBUTES.to_sentence}." unless SET_QUEUE_ATTRIBUTES.include?(qa) end |
#validate_timeout(to) ⇒ Object
356 357 358 |
# File 'lib/activemessaging/adapters/sqs.rb', line 356 def validate_timeout to raise "Timeout, #{to}, must be between #{VISIBILITY_TIMEOUT.min} and #{VISIBILITY_TIMEOUT.max}." unless VISIBILITY_TIMEOUT.include?(to) end |