Class: Bisques::Client
- Inherits:
-
Object
- Object
- Bisques::Client
- Includes:
- AwsConnection
- Defined in:
- lib/bisques/client.rb
Overview
Bisques is a client for Amazon SQS. All of the API calls made to SQS are called via methods on this class.
Instance Attribute Summary collapse
-
#queue_prefix ⇒ String
to see queues whose name has this prefix.
Instance Method Summary collapse
-
#change_message_visibility(queue_url, receipt_handle, visibility_timeout) ⇒ AwsResponse
Change the visibility of a message on the queue.
-
#create_queue(name, attributes = {}) ⇒ Queue
Creates a new SQS queue and returns a Queue object.
-
#delete_message(queue_url, receipt_handle) ⇒ AwsResponse
Delete a message from a queue.
-
#delete_queue(queue_url) ⇒ AwsResponse
Deletes an SQS queue at a given path.
-
#get_or_create_queue(name) ⇒ Queue
Returns a Queue object representing an SQS queue, creating it if it does not already exist.
-
#get_queue(name, options = {}) ⇒ Queue?
Get an SQS queue by name.
-
#get_queue_attributes(queue_url, attributes = ["All"]) ⇒ AwsResponse
Get the attributes for a queue.
-
#initialize(region, queue_prefix = nil, credentials = AwsCredentials.default) ⇒ Client
constructor
Initialize a client object.
-
#list_queues(prefix = "") ⇒ Array<Queue>
Return an array of Queue objects representing the queues found in SQS.
-
#receive_message(queue_url, options = {}) ⇒ AwsResponse
Receive a message from a queue.
-
#send_message(queue_url, message_body, delay_seconds = nil) ⇒ Object
Put a message on a queue.
- #send_message_batch(queue_url, messages) ⇒ Object
Methods included from AwsConnection
#action, #marshal_dump, #marshal_load, #request
Constructor Details
#initialize(region, queue_prefix = nil, credentials = AwsCredentials.default) ⇒ Client
Initialize a client object.
31 32 33 34 |
# File 'lib/bisques/client.rb', line 31 def initialize(region, queue_prefix = nil, credentials = AwsCredentials.default) super(region, "sqs", credentials) @queue_prefix = queue_prefix end |
Instance Attribute Details
#queue_prefix ⇒ String
to see queues whose name has this prefix.
22 23 24 |
# File 'lib/bisques/client.rb', line 22 def queue_prefix @queue_prefix end |
Instance Method Details
#change_message_visibility(queue_url, receipt_handle, visibility_timeout) ⇒ AwsResponse
Change the visibility of a message on the queue. This is useful if you have retrieved a message and now want to keep it hidden for longer before deleting it, or if you have a job and decide you cannot action it and want to return it to the queue sooner.
218 219 220 |
# File 'lib/bisques/client.rb', line 218 def (queue_url, receipt_handle, visibility_timeout) action("ChangeMessageVisibility", queue_url, {"ReceiptHandle" => receipt_handle, "VisibilityTimeout" => visibility_timeout}) end |
#create_queue(name, attributes = {}) ⇒ Queue
Creates a new SQS queue and returns a Queue object.
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/bisques/client.rb', line 52 def create_queue(name, attributes = {}) response = action("CreateQueue", {"QueueName" => Queue.sanitize_name("#{queue_prefix}#{name}")}.merge(attributes)) if response.success? Queue.new(self, response.doc.xpath("//QueueUrl").text) else raise "Could not create queue #{name}" end rescue AwsActionError => error if error.code == "AWS.SimpleQueueService.QueueDeletedRecently" raise QueueDeletedRecentlyError, error. else raise error end end |
#delete_message(queue_url, receipt_handle) ⇒ AwsResponse
Delete a message from a queue. The message is deleted by the handle given when the message is retrieved.
194 195 196 |
# File 'lib/bisques/client.rb', line 194 def (queue_url, receipt_handle) action("DeleteMessage", queue_url, {"ReceiptHandle" => receipt_handle}) end |
#delete_queue(queue_url) ⇒ AwsResponse
Deletes an SQS queue at a given path.
73 74 75 |
# File 'lib/bisques/client.rb', line 73 def delete_queue(queue_url) response = action("DeleteQueue", queue_url) end |
#get_or_create_queue(name) ⇒ Queue
Returns a Queue object representing an SQS queue, creating it if it does not already exist.
42 43 44 |
# File 'lib/bisques/client.rb', line 42 def get_or_create_queue(name) get_queue(name) || create_queue(name, {}) end |
#get_queue(name, options = {}) ⇒ Queue?
Get an SQS queue by name.
82 83 84 85 86 87 88 89 90 91 |
# File 'lib/bisques/client.rb', line 82 def get_queue(name, = {}) response = action("GetQueueUrl", {"QueueName" => Queue.sanitize_name("#{queue_prefix}#{name}")}.merge()) if response.success? Queue.new(self, response.doc.xpath("//QueueUrl").text) end rescue Bisques::AwsActionError => e raise unless e.code == "AWS.SimpleQueueService.NonExistentQueue" end |
#get_queue_attributes(queue_url, attributes = ["All"]) ⇒ AwsResponse
Get the attributes for a queue. Takes an array of attribute names. Defaults to [“All”] which returns all the available attributes.
121 122 123 124 125 126 127 128 129 |
# File 'lib/bisques/client.rb', line 121 def get_queue_attributes(queue_url, attributes = ["All"]) attributes = attributes.map(&:to_s) query = Hash[*attributes.each_with_index.map do |attribute, index| ["AttributeName.#{index+1}", attribute] end.flatten] action("GetQueueAttributes", queue_url, query) end |
#list_queues(prefix = "") ⇒ Array<Queue>
Return an array of Queue objects representing the queues found in SQS. An optional prefix can be supplied to restrict the queues found. This prefix is additional to the client prefix.
107 108 109 110 111 112 |
# File 'lib/bisques/client.rb', line 107 def list_queues(prefix = "") response = action("ListQueues", "QueueNamePrefix" => "#{queue_prefix}#{prefix}") response.doc.xpath("//ListQueuesResult/QueueUrl").map(&:text).map do |url| Queue.new(self, url) end end |
#receive_message(queue_url, options = {}) ⇒ AwsResponse
Receive a message from a queue. Takes the queue url and an optional hash.
203 204 205 206 |
# File 'lib/bisques/client.rb', line 203 def (queue_url, = {}) # validate_options(options, %w(AttributeName MaxNumberOfMessages VisibilityTimeout WaitTimeSeconds)) action("ReceiveMessage", queue_url, ) end |
#send_message(queue_url, message_body, delay_seconds = nil) ⇒ Object
Put a message on a queue. Takes the queue url and the message body, which should be a string. An optional delay seconds argument can be added if the message should not become visible immediately.
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/bisques/client.rb', line 146 def (queue_url, , delay_seconds=nil) = {"MessageBody" => } ["DelaySeconds"] = delay_seconds if delay_seconds tries = 0 md5 = Digest::MD5.hexdigest() begin tries += 1 response = action("SendMessage", queue_url, ) returned_md5 = response.doc.xpath("//MD5OfMessageBody").text raise MessageHasWrongMd5Error.new(, md5, returned_md5) unless md5 == returned_md5 rescue MessageHasWrongMd5Error if tries < 2 retry else raise end end end |
#send_message_batch(queue_url, messages) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/bisques/client.rb', line 168 def (queue_url, ) = {} .each_with_index do |, index| id = nil, body = nil if .is_a?(Array) id, body, _ = else body = end ["SendMessageBatchRequestEntry.#{index+1}.Id"] = id if id.present? ["SendMessageBatchRequestEntry.#{index+1}.MessageBody"] = body end action("SendMessageBatch", queue_url, ) end |