Class: ActiveMessaging::Adapters::Sqs::Connection

Inherits:
BaseConnection show all
Defined in:
lib/activemessaging/adapters/sqs.rb

Constant Summary collapse

QUEUE_NAME_LENGTH =
1..80
VISIBILITY_TIMEOUT =
0..(24 * 60 * 60)
NUMBER_OF_MESSAGES =
1..255
GET_QUEUE_ATTRIBUTES =
['All', 'ApproximateNumberOfMessages', 'ApproximateNumberOfMessagesDelayed', 'ApproximateNumberOfMessagesNotVisible', 'CreatedTimestamp', 'DelaySeconds', 'LastModifiedTimestamp', 'MaximumMessageSize', 'MessageRetentionPeriod', 'Policy', 'QueueArn', 'ReceiveMessageWaitTimeSeconds', 'RedrivePolicy', 'VisibilityTimeout', 'KmsMasterKeyId', 'KmsDataKeyReusePeriodSeconds', 'FifoQueue', 'ContentBasedDeduplication'].freeze
SET_QUEUE_ATTRIBUTES =
['DelaySeconds', 'MaximumMessageSize', 'MessageRetentionPeriod', 'Policy', 'ReceiveMessageWaitTimeSeconds', 'RedrivePolicy', 'VisibilityTimeout', 'KmsMasterKeyId', 'KmsDataKeyReusePeriodSeconds', 'ContentBasedDeduplication'].freeze
URI_ENCODING_REPLACEMENTS =
{ '%7E' => '~', '+' => '%20' }.freeze

Instance Attribute Summary collapse

Attributes inherited from BaseConnection

#reliable

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ Connection

generic init method needed by a13g



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/activemessaging/adapters/sqs.rb', line 32

def initialize cfg
  raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?)
  raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?)

  @access_key_id = cfg[:access_key_id]
  @secret_access_key = cfg[:secret_access_key]
  @region = cfg[:region]                          || 'us-east-1'
  @request_expires = cfg[:requestExpires]         || 10
  @request_retry_count = cfg[:requestRetryCount]  || 5
  @aws_version = cfg[:aws_version]                || '2012-11-05'
  @content_type = cfg[:content_type]              || 'text/plain'
  @host = cfg[:host]                              || "sqs.#{@region}.amazonaws.com"
  @port = cfg[:port]                              || 80
  @protocol = cfg[:protocol]                      || 'http'
  @poll_interval = cfg[:poll_interval]            || 1
  @reconnect_delay = cfg[:reconnectDelay]         || 5

  @max_message_size = cfg[:max_message_size].to_i > 0 ? cfg[:max_message_size].to_i : 8

  @aws_url = "#{@protocol}://#{@host}/"

  @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list]
  @reliable =         cfg[:reliable].nil?         ? true : cfg[:reliable]

  #initialize the subscriptions and queues
  @subscriptions = {}
  @queues_by_priority = {}
  @current_subscription = 0
  queues
end

Instance Attribute Details

#access_key_idObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def access_key_id
  @access_key_id
end

#aws_versionObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def aws_version
  @aws_version
end

#cache_queue_listObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def cache_queue_list
  @cache_queue_list
end

#content_typeObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def content_type
  @content_type
end

#hostObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def host
  @host
end

#max_message_sizeObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def max_message_size
  @max_message_size
end

#poll_intervalObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def poll_interval
  @poll_interval
end

#portObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def port
  @port
end

#reconnect_delayObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def reconnect_delay
  @reconnect_delay
end

#secret_access_keyObject

configurable params



29
30
31
# File 'lib/activemessaging/adapters/sqs.rb', line 29

def secret_access_key
  @secret_access_key
end

Instance Method Details

#add_queue(url) ⇒ Object

internal data structure methods



319
320
321
322
323
# File 'lib/activemessaging/adapters/sqs.rb', line 319

def add_queue(url)
  q = Queue.from_url url
  queues[q.name] = q if self.cache_queue_list
  return q
end

#check_errors(response) ⇒ Object



304
305
306
307
308
# File 'lib/activemessaging/adapters/sqs.rb', line 304

def check_errors(response)
  raise 'http response was nil' if (response.nil?)
  raise response.errors if (response && response.errors?)
  response
end

#create_queue(name) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
# File 'lib/activemessaging/adapters/sqs.rb', line 148

def create_queue(name)
  validate_new_queue name
response = make_request('CreateQueue', nil, { 'QueueName' => name }, {
    'DelaySeconds' => 0,
    'MaximumMessageSize' => 262144,
    'MessageRetentionPeriod' => 4 * 24 * 60 * 60,
    'ReceiveMessageWaitTimeSeconds' => 0,
    'VisibilityTimeout' => 90 * 60
  })
  add_queue(response.get_text("/CreateQueueResponse/CreateQueueResult/QueueUrl")) unless response.nil?
end

#delete_message(message) ⇒ Object



221
222
223
# File 'lib/activemessaging/adapters/sqs.rb', line 221

def delete_message message
  response = make_request('DeleteMessage', message.queue.queue_url, { 'ReceiptHandle' => message.receipt_handle })
end

#delete_queue(queue) ⇒ Object



160
161
162
163
164
# File 'lib/activemessaging/adapters/sqs.rb', line 160

def delete_queue(queue)
  validate_queue queue
  response = make_request('DeleteQueue', queue.queue_url)
  queues.delete(queue.name)
end

#disconnectObject



63
64
65
# File 'lib/activemessaging/adapters/sqs.rb', line 63

def disconnect
  return true
end

#get_or_create_queue(queue_name) ⇒ Object



325
326
327
328
329
330
# File 'lib/activemessaging/adapters/sqs.rb', line 325

def get_or_create_queue queue_name
  qs = queues
  q = qs.has_key?(queue_name) ? qs[queue_name] : create_queue(queue_name)
  raise "could not get or create queue: #{queue_name}" unless q
  q
end

#get_queue_attributes(queue, attributes = ['All']) ⇒ Object



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/activemessaging/adapters/sqs.rb', line 173

def get_queue_attributes(queue, attributes = ['All'])
  params = {}
  attributes.each_with_index do |attribute, i|
    validate_get_queue_attribute(attribute)
    params["AttributeName.#{i+1}"] = attribute
  end
  response = make_request('GetQueueAttributes', queue.queue_url, params)
  attributes = {}
  response.each_node('/GetQueueAttributesResponse/GetQueueAttributesResult/Attribute') { |n|
    name = n.elements['Name'].text
    value = n.elements['Value'].text
    attributes[name] = value
  }
  attributes
end

#http_request(h, p, r) ⇒ Object



293
294
295
296
297
298
299
300
301
302
# File 'lib/activemessaging/adapters/sqs.rb', line 293

def http_request h, p, r
  http = Net::HTTP.new(h, p)
  # http.set_debug_output(STDOUT)

  http.use_ssl = 'https' == @protocol

  # Don't carp about SSL cert verification
  http.verify_mode = OpenSSL::SSL::VERIFY_NONE
  return http.request(r)
end

#list_queues(queue_name_prefix = nil) ⇒ Object



166
167
168
169
170
171
# File 'lib/activemessaging/adapters/sqs.rb', line 166

def list_queues(queue_name_prefix = nil)
  validate_queue_name queue_name_prefix unless queue_name_prefix.nil?
  params = queue_name_prefix.nil? ? {} : { "QueueNamePrefix" => queue_name_prefix }
response = make_request('ListQueues', nil, params)
  response.nil? ? [] : response.nodes("/ListQueuesResponse/ListQueuesResult/QueueUrl").collect{ |n| add_queue(n.text) }
end

#make_request(action, url = nil, params = {}, attributes = {}) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/activemessaging/adapters/sqs.rb', line 225

def make_request(action, url=nil, params = {}, attributes = {})
 url ||= @aws_url

params['Action'] = action
params['Version'] = @aws_version
  params['Expires']= (Time.now + @request_expires).utc.iso8601

  attributes.keys.sort.each_with_index do |k, i|
    params["Attributes.#{i + 1}.Name"] = k
    params["Attributes.#{i + 1}.Value"] = attributes[k]
  end

  # Sort and encode query params
  query_params = params.keys.sort.map do |key|
    key + "=" + url_encode(params[key])
  end

  # Put these together with the uri to get the request query string
  request_url = "#{url}?#{query_params.join("&")}"

  # Create the request
  init_headers = {
    'Date' => Time.now.utc.iso8601,
    'Host' => @host
  }
  request = Net::HTTP::Get.new(request_url, init_headers)

  # Sign the request
  signer = AWS4Signer.new({
    :access_key => @access_key_id,
    :secret_key => @secret_access_key,
    :region => @region
  })

  headers = {}
  request.canonical_each { |k, v| headers[k] = v }

  signature = signer.sign('GET', URI.parse(request_url), headers, nil, false)
  signature.each { |k, v| request[k] = v }

  # Make the request
  retry_count = 0
  while retry_count < @request_retry_count.to_i
  retry_count = retry_count + 1
    begin
      http_response = http_request(host,port,request)
      response = SQSResponse.new(http_response)
      check_errors(response)
      return response
    rescue Object=>ex
      raise ex unless reliable
		  sleep(@reconnect_delay)
    end
  end
end

#message_size_rangeObject



352
353
354
# File 'lib/activemessaging/adapters/sqs.rb', line 352

def message_size_range
  @_message_size_range ||= 1..(max_message_size * 1024)
end

#queuesObject



310
311
312
313
314
315
# File 'lib/activemessaging/adapters/sqs.rb', line 310

def queues
  return @queues if (@queues && cache_queue_list)
  @queues = {}
  list_queues.each { |q| @queues[q.name] = q }
  return @queues
end

#receive(options = {}) ⇒ Object

new receive respects priorities



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/activemessaging/adapters/sqs.rb', line 103

def receive(options = {})
  message = nil

  only_priorities = options[:priorities]

  # loop through the priorities
  @queues_by_priority.keys.sort.each do |priority|

    # skip this priority if there is a list, and it is not in the list
    next if only_priorities && !only_priorities.include?(priority.to_i)

    # loop through queues for the same priority in random order each time
    @queues_by_priority[priority].shuffle.each do |queue_name|
      queue = queues[queue_name]
      subscription = @subscriptions[queue_name]

      next if queue.nil? || subscription.nil?
      messages = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout])

      if (messages && !messages.empty?)
        message = messages[0]
      end

      break if message
    end

    break if message
  end
  message
end

#received(message, headers = {}) ⇒ Object



134
135
136
137
138
139
140
141
# File 'lib/activemessaging/adapters/sqs.rb', line 134

def received(message, headers={})
  begin
    delete_message(message)
  rescue Object => exception
    logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: "
    logger.error exception
  end
end

#retrieve_messsages(queue, num_messages = 1, timeout = nil, waittime = nil) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/activemessaging/adapters/sqs.rb', line 206

def retrieve_messsages(queue, num_messages = 1, timeout = nil, waittime = nil)
  validate_queue queue
  validate_number_of_messages num_messages
  validate_timeout timeout if timeout

  params = { 'MaxNumberOfMessages' => num_messages.to_s, 'AttributeName' => 'All' }
  params['VisibilityTimeout'] = timeout.to_s if timeout
  params['WaitTimeSeconds'] = waittime.to_s if waittime

  response = make_request('ReceiveMessage', queue.queue_url, params)
  response.nodes('/ReceiveMessageResponse/ReceiveMessageResult/Message').map do |n|
    Message.from_element(n, response, queue)
  end unless response.nil?
end

#send(queue_name, message_body, message_headers = {}) ⇒ Object

queue_name string, body string, headers hash send a single message to a queue



97
98
99
100
# File 'lib/activemessaging/adapters/sqs.rb', line 97

def send(queue_name, message_body, message_headers = {})
  queue = get_or_create_queue(queue_name)
  send_messsage queue, message_body
end

#send_messsage(queue, message) ⇒ Object



199
200
201
202
203
204
# File 'lib/activemessaging/adapters/sqs.rb', line 199

def send_messsage(queue, message)
  validate_queue queue
  validate_message message
  response = make_request('SendMessage', queue.queue_url, { 'MessageBody' => message })
  response.get_text('/SendMessageResponse/SendMessageResult/MessageId') unless response.nil?
end

#set_queue_attributes(queue, attributes) ⇒ Object



189
190
191
192
# File 'lib/activemessaging/adapters/sqs.rb', line 189

def set_queue_attributes(queue, attributes)
  attributes.keys.each { |a| validate_set_queue_attribute(a) }
  response = make_request('SetQueueAttributes', queue.queue_url, {}, attributes)
end

#subscribe(queue_name, message_headers = {}) ⇒ Object

queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues



69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/activemessaging/adapters/sqs.rb', line 69

def subscribe queue_name, message_headers={}
  # look at the existing queues, create any that are missing
  queue = get_or_create_queue queue_name
  if @subscriptions.has_key? queue.name
    @subscriptions[queue.name].add
  else
    @subscriptions[queue.name] = Subscription.new(queue.name, message_headers)
  end
  priority = @subscriptions[queue.name].priority

  @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority)
  @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name)
end

#unreceive(message, headers = {}) ⇒ Object

do nothing; by not deleting the message will eventually become visible again



144
145
146
# File 'lib/activemessaging/adapters/sqs.rb', line 144

def unreceive(message, headers = {})
  return true
end

#unsubscribe(queue_name, message_headers = {}) ⇒ Object

queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok



85
86
87
88
89
90
91
92
93
# File 'lib/activemessaging/adapters/sqs.rb', line 85

def unsubscribe queue_name, message_headers={}
  if @subscriptions[queue_name]
    @subscriptions[queue_name].remove
    if @subscriptions[queue_name].count <= 0
      sub = @subscriptions.delete(queue_name)
      @queues_by_priority[sub.priority].delete(queue_name)
    end
  end
end

#url_encode(param) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
# File 'lib/activemessaging/adapters/sqs.rb', line 281

def url_encode(param)
  param = param.to_s

  if param.respond_to?(:encode)
    param = param.encode('UTF-8')
  end

  param = CGI::escape(param)
  URI_ENCODING_REPLACEMENTS.each { |k, v| param = param.gsub(k, v) }
  param
end

#validate_get_queue_attribute(qa) ⇒ Object



360
361
362
# File 'lib/activemessaging/adapters/sqs.rb', line 360

def validate_get_queue_attribute qa
  raise "Queue Attribute name, #{qa}, not in list of valid attributes to get: #{GET_QUEUE_ATTRIBUTES.to_sentence}." unless GET_QUEUE_ATTRIBUTES.include?(qa)
end

#validate_message(m) ⇒ Object



347
348
349
350
# File 'lib/activemessaging/adapters/sqs.rb', line 347

def validate_message m
  raise "Message cannot be nil." if m.nil?
  raise "Message length, #{m.length}, must be between #{message_size_range.min} and #{message_size_range.max}." unless message_size_range.include?(m.length)
end

#validate_new_queue(qn) ⇒ Object



338
339
340
341
# File 'lib/activemessaging/adapters/sqs.rb', line 338

def validate_new_queue qn
  validate_queue_name qn
  raise "Queue already exists: #{qn}" if queues.has_key? qn
end

#validate_number_of_messages(nom) ⇒ Object



368
369
370
# File 'lib/activemessaging/adapters/sqs.rb', line 368

def validate_number_of_messages nom
  raise "Number of messages, #{nom}, must be between #{NUMBER_OF_MESSAGES.min} and #{NUMBER_OF_MESSAGES.max}." unless NUMBER_OF_MESSAGES.include?(nom)
end

#validate_queue(q) ⇒ Object



343
344
345
# File 'lib/activemessaging/adapters/sqs.rb', line 343

def validate_queue q
    raise "Never heard of queue, can't use it: #{q.name}" unless queues.has_key? q.name
end

#validate_queue_name(qn) ⇒ Object

validation methods



333
334
335
336
# File 'lib/activemessaging/adapters/sqs.rb', line 333

def validate_queue_name qn
  raise "Queue name, '#{qn}', must be between #{QUEUE_NAME_LENGTH.min} and #{QUEUE_NAME_LENGTH.max} characters." unless QUEUE_NAME_LENGTH.include?(qn.length)
  raise "Queue name, '#{qn}', must be alphanumeric only." if (qn =~ /[^\w\-\_]/ )
end

#validate_set_queue_attribute(qa) ⇒ Object



364
365
366
# File 'lib/activemessaging/adapters/sqs.rb', line 364

def validate_set_queue_attribute qa
  raise "Queue Attribute name, #{qa}, not in list of valid attributes to set: #{SET_QUEUE_ATTRIBUTES.to_sentence}." unless SET_QUEUE_ATTRIBUTES.include?(qa)
end

#validate_timeout(to) ⇒ Object



356
357
358
# File 'lib/activemessaging/adapters/sqs.rb', line 356

def validate_timeout to
  raise "Timeout, #{to}, must be between #{VISIBILITY_TIMEOUT.min} and #{VISIBILITY_TIMEOUT.max}." unless VISIBILITY_TIMEOUT.include?(to)
end