Class: RightAws::SqsInterface

Inherits:
RightAwsBase show all
Includes:
RightAwsBaseInterface
Defined in:
lib/sqs/right_sqs_interface.rb

Defined Under Namespace

Classes: SqsCreateQueueParser, SqsGetQueueAttributesParser, SqsGetVisibilityTimeoutParser, SqsListGrantsParser, SqsListQueuesParser, SqsReceiveMessagesParser, SqsSendMessagesParser, SqsStatusParser

Constant Summary collapse

SIGNATURE_VERSION =
"1"
API_VERSION =
"2007-05-01"
DEFAULT_HOST =
"queue.amazonaws.com"
DEFAULT_PORT =
443
DEFAULT_PROTOCOL =
'https'
REQUEST_TTL =
30
DEFAULT_VISIBILITY_TIMEOUT =
30
@@bench =
AwsBenchmarkingBlock.new
@@api =
API_VERSION

Constants inherited from RightAwsBase

RightAwsBase::AMAZON_PROBLEMS

Instance Attribute Summary

Attributes included from RightAwsBaseInterface

#aws_access_key_id, #cache, #connection, #last_errors, #last_request, #last_request_id, #last_response, #logger, #params

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RightAwsBaseInterface

#cache_hits?, caching, caching=, #caching?, #init, #multi_thread, #on_exception, #request_info_impl, #update_cache

Methods inherited from RightAwsBase

amazon_problems, amazon_problems=

Constructor Details

#initialize(aws_access_key_id = nil, aws_secret_access_key = nil, params = {}) ⇒ SqsInterface

Creates a new SqsInterface instance.

sqs = RightAws::SqsInterface.new('1E3GDYEOGFJPIT75KDT40','hgTHt68JY07JKUY08ftHYtERkjgtfERn57DFE379', {:multi_thread => true, :logger => Logger.new('/tmp/x.log')}) #=> <RightSqs:0xb7af6264>

Params is a hash:

{:server       => 'queue.amazonaws.com' # Amazon service host: 'queue.amazonaws.com'(default)
 :port         => 443                   # Amazon service port: 80 or 443(default)
 :multi_thread => true|false            # Multi-threaded (connection per each thread): true or false(default)
 :logger       => Logger Object}        # Logger instance: logs to STDOUT if omitted }


62
63
64
65
66
67
68
69
70
# File 'lib/sqs/right_sqs_interface.rb', line 62

def initialize(aws_access_key_id=nil, aws_secret_access_key=nil, params={})
  init({ :name             => 'SQS', 
         :default_host     => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).host   : DEFAULT_HOST, 
         :default_port     => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).port   : DEFAULT_PORT, 
         :default_protocol => ENV['SQS_URL'] ? URI.parse(ENV['SQS_URL']).scheme : DEFAULT_PROTOCOL }, 
       aws_access_key_id     || ENV['AWS_ACCESS_KEY_ID'], 
       aws_secret_access_key || ENV['AWS_SECRET_ACCESS_KEY'], 
       params)
end

Class Method Details

.apiObject



47
48
49
# File 'lib/sqs/right_sqs_interface.rb', line 47

def self.api 
  @@api
end

.bench_sqsObject



42
43
44
# File 'lib/sqs/right_sqs_interface.rb', line 42

def self.bench_sqs
  @@bench.service
end

.bench_xmlObject



39
40
41
# File 'lib/sqs/right_sqs_interface.rb', line 39

def self.bench_xml
  @@bench.xml
end

.queue_name_by_url(queue_url) ⇒ Object

Returns short queue name by url.

RightSqs.queue_name_by_url('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 'my_awesome_queue'


385
386
387
388
389
# File 'lib/sqs/right_sqs_interface.rb', line 385

def self.queue_name_by_url(queue_url)
  queue_url[/[^\/]*$/]
rescue
  on_exception
end

Instance Method Details

#add_grant(queue_url, grantee_email_address, permission = nil) ⇒ Object

Adds grants for user (identified by email he registered at Amazon). Returns true or an exception. Permission = ‘FULLCONTROL’ | ‘RECEIVEMESSAGE’ | ‘SENDMESSAGE’.

sqs.add_grant('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '[email protected]', 'FULLCONTROL') #=> true


243
244
245
246
247
248
249
250
251
# File 'lib/sqs/right_sqs_interface.rb', line 243

def add_grant(queue_url, grantee_email_address, permission = nil)
  req_hash = generate_request('AddGrant', 
                              'Grantee.EmailAddress' => grantee_email_address,
                              'Permission'           => permission,
                              :queue_url             => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#change_message_visibility(queue_url, message_id, visibility_timeout = 0) ⇒ Object

Changes message visibility timeout. Returns true or an exception.

sqs.change_message_visibility('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '1234567890...0987654321', 10) #=> true


356
357
358
359
360
361
362
363
364
# File 'lib/sqs/right_sqs_interface.rb', line 356

def change_message_visibility(queue_url, message_id, visibility_timeout=0)
  req_hash = generate_request('ChangeMessageVisibility', 
                              'MessageId'         => message_id,
                              'VisibilityTimeout' => visibility_timeout.to_s,
                              :queue_url          => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#clear_queue(queue_url) ⇒ Object

Removes all visible messages from queue. Return true or an exception.

sqs.clear_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true


415
416
417
418
419
420
# File 'lib/sqs/right_sqs_interface.rb', line 415

def clear_queue(queue_url)
  while (m = pop_message(queue_url)) ; end   # delete all messages in queue
  true
rescue
  on_exception
end

#create_queue(queue_name, default_visibility_timeout = nil) ⇒ Object

Creates new queue. Returns new queue link.

sqs.create_queue('my_awesome_queue') #=> 'http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'

PS Some queue based requests may not become available until a couple of minutes after queue creation (permission grant and removal for example)



148
149
150
151
152
153
# File 'lib/sqs/right_sqs_interface.rb', line 148

def create_queue(queue_name, default_visibility_timeout=nil)
  req_hash = generate_request('CreateQueue', 
                              'QueueName'                => queue_name,
                              'DefaultVisibilityTimeout' => default_visibility_timeout || DEFAULT_VISIBILITY_TIMEOUT )
  request_info(req_hash, SqsCreateQueueParser.new(:logger => @logger))
end

#delete_message(queue_url, message_id) ⇒ Object

Deletes message from queue. Returns true or an exception. Amazon returns true on deletion of non-existent messages.

sqs.delete_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '12345678904...0987654321') #=> true


343
344
345
346
347
348
349
350
# File 'lib/sqs/right_sqs_interface.rb', line 343

def delete_message(queue_url, message_id)
  req_hash = generate_request('DeleteMessage', 
                              'MessageId' => message_id,
                              :queue_url  => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#delete_queue(queue_url, force_deletion = false) ⇒ Object

Deletes queue (queue must be empty or force_deletion must be set to true). Queue is identified by url. Returns true or an exception.

sqs.delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue_2') #=> true


172
173
174
175
176
177
178
179
# File 'lib/sqs/right_sqs_interface.rb', line 172

def delete_queue(queue_url, force_deletion = false)
  req_hash = generate_request('DeleteQueue', 
                              'ForceDeletion' => force_deletion.to_s,
                              :queue_url      => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#force_clear_queue(queue_url) ⇒ Object

Deletes queue then re-creates it (restores attributes also). The fastest method to clear big queue or queue with invisible messages. Return true or an exception.

sqs.force_clear_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true

PS This function is no longer supported. Amazon has changed the SQS semantics to require at least 60 seconds between queue deletion and creation. Hence this method will fail with an exception.



429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/sqs/right_sqs_interface.rb', line 429

def force_clear_queue(queue_url)
  queue_name       = queue_name_by_url(queue_url)
  queue_attributes = get_queue_attributes(queue_url)
  force_delete_queue(queue_url)
  create_queue(queue_name)
    # hmmm... The next line is a trick. Amazon do not want change attributes immediately after queue creation
    # So we do 'empty' get_queue_attributes. Probably they need some time to allow attributes change.
  get_queue_attributes(queue_url)  
  queue_attributes.each{ |attribute, value| set_queue_attributes(queue_url, attribute, value) }
  true
rescue
  on_exception
end

#force_delete_queue(queue_url) ⇒ Object

Deletes queue even if it has messages. Return true or an exception.

force_delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> true

P.S. same as delete_queue('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', true)



448
449
450
451
452
# File 'lib/sqs/right_sqs_interface.rb', line 448

def force_delete_queue(queue_url)
  delete_queue(queue_url, true)
rescue
  on_exception
end

#generate_request(action, param = {}) ⇒ Object

Generates a request hash for the query API



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/sqs/right_sqs_interface.rb', line 78

def generate_request(action, param={})  # :nodoc:
    # Sometimes we need to use queue uri (delete queue etc)
    # In that case we will use Symbol key: 'param[:queue_url]'
  queue_uri = param[:queue_url] ? URI(param[:queue_url]).path : '/'
    # remove unset(=optional) and symbolyc keys
  param.each{ |key, value| param.delete(key) if (value.nil? || key.is_a?(Symbol)) }
    # prepare output hash
  request_hash = { "Action"           => action,
                   # "Expires"          => Time.now.utc.since(REQUEST_TTL).strftime("%Y-%m-%dT%H:%M:%SZ"),
                   "Expires"          => (Time.now + REQUEST_TTL).utc.strftime("%Y-%m-%dT%H:%M:%SZ"),
                   "AWSAccessKeyId"   => @aws_access_key_id,
                   "Version"          => API_VERSION,
                   "SignatureVersion" => SIGNATURE_VERSION }
  request_hash.update(param)
  request_data   = request_hash.sort{|a,b| (a[0].to_s.downcase)<=>(b[0].to_s.downcase)}.to_s
  request_hash['Signature'] = AwsUtils::sign(@aws_secret_access_key, request_data)
  request_params = request_hash.to_a.collect{|key,val| key.to_s + "=" + CGI::escape(val.to_s) }.join("&")
  request        = Net::HTTP::Get.new("#{queue_uri}?#{request_params}")
    # prepare output hash
  { :request  => request, 
    :server   => @params[:server],
    :port     => @params[:port],
    :protocol => @params[:protocol] }
end

#generate_rest_request(method, param) ⇒ Object

Generates a request hash for the REST API



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/sqs/right_sqs_interface.rb', line 104

def generate_rest_request(method, param) # :nodoc:
  queue_uri = param[:queue_url] ? URI(param[:queue_url]).path : '/'
  message   = param[:message]                # extract message body if nesessary
    # remove unset(=optional) and symbolyc keys
  param.each{ |key, value| param.delete(key) if (value.nil? || key.is_a?(Symbol)) }
    # created request
  param_to_str = param.to_a.collect{|key,val| key.to_s + "=" + CGI::escape(val.to_s) }.join("&")
  param_to_str = "?#{param_to_str}" unless param_to_str.blank?
  request = "Net::HTTP::#{method.capitalize}".constantize.new("#{queue_uri}#{param_to_str}")
  request.body = message if message
    # set main headers
  request['content-md5']  = ''
  request['Content-Type'] = 'text/plain'
  request['Date']         = Time.now.httpdate
    # generate authorization string
  auth_string = "#{method.upcase}\n#{request['content-md5']}\n#{request['Content-Type']}\n#{request['Date']}\n#{CGI::unescape(queue_uri)}"
  signature   = AwsUtils::sign(@aws_secret_access_key, auth_string)
    # set other headers
  request['Authorization'] = "AWS #{@aws_access_key_id}:#{signature}"
  request['AWS-Version']   = API_VERSION
    # prepare output hash
  { :request  => request, 
    :server   => @params[:server],
    :port     => @params[:port],
    :protocol => @params[:protocol] }
end

#get_queue_attributes(queue_url, attribute = 'All') ⇒ Object

Retrieves the queue attribute(s). Returns a hash of attribute(s) or an exception.

sqs.get_queue_attributes('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> {"ApproximateNumberOfMessages"=>"0", "VisibilityTimeout"=>"30"}


185
186
187
188
189
190
191
192
# File 'lib/sqs/right_sqs_interface.rb', line 185

def get_queue_attributes(queue_url, attribute='All')
  req_hash = generate_request('GetQueueAttributes', 
                              'Attribute' => attribute,
                              :queue_url  => queue_url)
  request_info(req_hash, SqsGetQueueAttributesParser.new(:logger => @logger))
rescue
  on_exception
end

#get_queue_length(queue_url) ⇒ Object

Returns approximate number of messages in queue.

sqs.get_queue_length('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 3


405
406
407
408
409
# File 'lib/sqs/right_sqs_interface.rb', line 405

def get_queue_length(queue_url)
  get_queue_attributes(queue_url)['ApproximateNumberOfMessages'].to_i
rescue
  on_exception
end

#get_visibility_timeout(queue_url) ⇒ Object

Retrieves visibility timeout.

sqs.get_visibility_timeout('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 15

See also: get_queue_attributes



232
233
234
235
236
237
# File 'lib/sqs/right_sqs_interface.rb', line 232

def get_visibility_timeout(queue_url)
  req_hash = generate_request('GetVisibilityTimeout', :queue_url => queue_url )
  request_info(req_hash, SqsGetVisibilityTimeoutParser.new(:logger => @logger))
rescue
  on_exception
end

#list_grants(queue_url, grantee_email_address = nil, permission = nil) ⇒ Object

Retrieves hash of grantee_id => perms for this queue:

sqs.list_grants('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=>
  {"000000000000000000000001111111111117476c7fea6efb2c3347ac3ab2792a"=>{:name=>"root", :perms=>["FULLCONTROL"]},
   "00000000000000000000000111111111111e5828344600fc9e4a784a09e97041"=>{:name=>"myawesomefriend", :perms=>["FULLCONTROL"]}


259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/sqs/right_sqs_interface.rb', line 259

def list_grants(queue_url, grantee_email_address=nil, permission = nil)
  req_hash = generate_request('ListGrants', 
                              'Grantee.EmailAddress' => grantee_email_address,
                              'Permission'           => permission,
                              :queue_url             => queue_url)
  response = request_info(req_hash, SqsListGrantsParser.new(:logger => @logger))
    # One user may have up to 3 permission records for every queue.
    # We will join these records to one.
  result = {}    
  response.each do |perm|
    id = perm[:id]
      # create hash for new user if unexisit
    result[id] = {:perms=>[]} unless result[id]
      # fill current grantee params
    result[id][:perms] << perm[:permission]
    result[id][:name] = perm[:name]
  end
  result
end

#list_queues(queue_name_prefix = nil) ⇒ Object

Lists all queues owned by this user that have names beginning with queue_name_prefix. If queue_name_prefix is omitted then retrieves a list of all queues.

sqs.create_queue('my_awesome_queue')
sqs.create_queue('my_awesome_queue_2')
sqs.list_queues('my_awesome') #=> ['http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue','http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue_2']


161
162
163
164
165
166
# File 'lib/sqs/right_sqs_interface.rb', line 161

def list_queues(queue_name_prefix=nil)
  req_hash = generate_request('ListQueues', 'QueueNamePrefix' => queue_name_prefix)
  request_info(req_hash, SqsListQueuesParser.new(:logger => @logger))
rescue
  on_exception
end

#peek_message(queue_url, message_id) ⇒ Object

Peeks message from queue by message id. Returns message in format of {:id=>'message_id', :body=>'message_body'} or nil.

sqs.peek_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '1234567890...0987654321') #=>
  {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}


317
318
319
320
321
322
323
# File 'lib/sqs/right_sqs_interface.rb', line 317

def peek_message(queue_url, message_id)
  req_hash = generate_rest_request('GET', :queue_url => "#{queue_url}/#{CGI::escape message_id}" )
  messages = request_info(req_hash, SqsReceiveMessagesParser.new(:logger => @logger))
  messages.blank? ? nil : messages[0]
rescue
  on_exception
end

#pop_message(queue_url) ⇒ Object

Pops (retrieves and deletes) first accessible message from queue. Returns the message in format {:id=>'message_id', :body=>'message_body'} or nil.

sqs.pop_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=>
  {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}


489
490
491
492
493
494
# File 'lib/sqs/right_sqs_interface.rb', line 489

def pop_message(queue_url)
  messages = pop_messages(queue_url)
  messages.blank? ? nil : messages[0]
rescue
  on_exception
end

#pop_messages(queue_url, number_of_messages = 1) ⇒ Object

Pops (retrieves and deletes) up to ‘number_of_messages’ from queue. Returns an array of retrieved messages in format: [{:id=>'message_id', :body=>'message_body'}].

sqs.pop_messages('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 3) #=>
 [{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}, ..., {}]


474
475
476
477
478
479
480
481
482
# File 'lib/sqs/right_sqs_interface.rb', line 474

def pop_messages(queue_url, number_of_messages=1)
  messages = receive_messages(queue_url, number_of_messages)
  messages.each do |message|
    delete_message(queue_url, message[:id])
  end
  messages
rescue
  on_exception
end

#queue_name_by_url(queue_url) ⇒ Object

Returns short queue name by url.

sqs.queue_name_by_url('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue') #=> 'my_awesome_queue'


395
396
397
398
399
# File 'lib/sqs/right_sqs_interface.rb', line 395

def queue_name_by_url(queue_url)
  self.class.queue_name_by_url(queue_url)
rescue
  on_exception
end

#queue_url_by_name(queue_name) ⇒ Object

Returns queue url by queue short name or nil if queue is not found

sqs.queue_url_by_name('my_awesome_queue') #=> 'http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue'


370
371
372
373
374
375
376
377
378
379
# File 'lib/sqs/right_sqs_interface.rb', line 370

def queue_url_by_name(queue_name)
  return queue_name if queue_name.include?('/')
  queue_urls = list_queues(queue_name)
  queue_urls.each do |queue_url|
    return queue_url if queue_name_by_url(queue_url) == queue_name
  end
  nil
rescue
  on_exception
end

#receive_message(queue_url, visibility_timeout = nil) ⇒ Object

Reads first accessible message from queue. Returns message as a hash: {:id=>'message_id', :body=>'message_body'} or nil.

sqs.receive_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 10) #=>
  {:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}


459
460
461
462
463
464
# File 'lib/sqs/right_sqs_interface.rb', line 459

def receive_message(queue_url, visibility_timeout=nil)
  result = receive_messages(queue_url, 1, visibility_timeout)
  result.blank? ? nil : result[0]
rescue
  on_exception
end

#receive_messages(queue_url, number_of_messages = 1, visibility_timeout = nil) ⇒ Object

Retrieves a list of messages from queue. Returns an array of hashes in format: {:id=>'message_id', body=>'message_body'}

sqs.receive_messages('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue',10, 5) #=>
 [{:id=>"12345678904GEZX9746N|0N9ED344VK5Z3SV1DTM0|1RVYH4X3TJ0987654321", :body=>"message_1"}, ..., {}]

P.S. Usually returns fewer messages than requested even if they are available.



301
302
303
304
305
306
307
308
309
310
# File 'lib/sqs/right_sqs_interface.rb', line 301

def receive_messages(queue_url, number_of_messages=1, visibility_timeout=nil)
  return [] if number_of_messages == 0
  req_hash = generate_rest_request('GET',
                                   'NumberOfMessages'  => number_of_messages,
                                   'VisibilityTimeout' => visibility_timeout,
                                   :queue_url          => "#{queue_url}/front" )
  request_info(req_hash, SqsReceiveMessagesParser.new(:logger => @logger))
rescue
  on_exception
end

#remove_grant(queue_url, grantee_email_address_or_id, permission = nil) ⇒ Object

Revokes permission from user. Returns true or an exception.

sqs.remove_grant('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', '[email protected]', 'FULLCONTROL') #=> true


283
284
285
286
287
288
289
290
291
292
# File 'lib/sqs/right_sqs_interface.rb', line 283

def remove_grant(queue_url, grantee_email_address_or_id, permission = nil)
  grantee_key = grantee_email_address_or_id.include?('@') ? 'Grantee.EmailAddress' : 'Grantee.ID'
  req_hash = generate_request('RemoveGrant', 
                              grantee_key  => grantee_email_address_or_id,
                              'Permission' => permission,
                              :queue_url   => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#request_info(request, parser) ⇒ Object

Sends request to Amazon and parses the response Raises AwsError if any banana happened



134
135
136
137
138
# File 'lib/sqs/right_sqs_interface.rb', line 134

def request_info(request, parser) # :nodoc:
  thread = @params[:multi_thread] ? Thread.current : Thread.main
  thread[:sqs_connection] ||= Rightscale::HttpConnection.new(:exception => AwsError, :logger => @logger)
  request_info_impl(thread[:sqs_connection], @@bench, request, parser)
end

#send_message(queue_url, message) ⇒ Object Also known as: push_message

Sends new message to queue.Returns ‘message_id’ or raises an exception.

sqs.send_message('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 'message_1') #=> "1234567890...0987654321"


329
330
331
332
333
334
335
336
# File 'lib/sqs/right_sqs_interface.rb', line 329

def send_message(queue_url, message)
  req_hash = generate_rest_request('PUT',
                                   :message   => message,
                                   :queue_url => "#{queue_url}/back")
  request_info(req_hash, SqsSendMessagesParser.new(:logger => @logger))
rescue
  on_exception
end

#set_queue_attributes(queue_url, attribute, value) ⇒ Object

Sets queue attribute. Returns true or an exception.

sqs.set_queue_attributes('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', "VisibilityTimeout", 10) #=> true

P.S. Amazon returns success even if the attribute does not exist. Also, attribute values may not be immediately available to other queries for some time after an update (see the SQS documentation for semantics).



201
202
203
204
205
206
207
208
209
# File 'lib/sqs/right_sqs_interface.rb', line 201

def set_queue_attributes(queue_url, attribute, value)
  req_hash = generate_request('SetQueueAttributes', 
                              'Attribute' => attribute,
                              'Value'     => value,
                              :queue_url  => queue_url)
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end

#set_visibility_timeout(queue_url, visibility_timeout = nil) ⇒ Object

Sets visibility timeout. Returns true or an exception.

sqs.set_visibility_timeout('http://queue.amazonaws.com/ZZ7XXXYYYBINS/my_awesome_queue', 15) #=> true

See also: set_queue_attributes



217
218
219
220
221
222
223
224
# File 'lib/sqs/right_sqs_interface.rb', line 217

def set_visibility_timeout(queue_url, visibility_timeout=nil)
  req_hash = generate_request('SetVisibilityTimeout', 
                              'VisibilityTimeout' => visibility_timeout || DEFAULT_VISIBILITY_TIMEOUT,
                              :queue_url => queue_url )
  request_info(req_hash, SqsStatusParser.new(:logger => @logger))
rescue
  on_exception
end