Class: Bisques::Client

Inherits:
Object
  • Object
show all
Includes:
AwsConnection
Defined in:
lib/bisques/client.rb

Overview

Bisques is a client for Amazon SQS. All of the API calls made to SQS are called via methods on this class.

Examples:


client = Bisques::Client.new('us-east-1', 'my_queues_', AwsCredentials.new(aws_key, aws_secret))
client.list_queues

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AwsConnection

#action, #marshal_dump, #marshal_load, #request

Constructor Details

#initialize(region, queue_prefix = nil, credentials = AwsCredentials.default) ⇒ Client

Initialize a client object.

Parameters:

  • region (String)

    the AWS region.

  • queue_prefix (String) (defaults to: nil)

    an optional prefix for all queue names for this instance.

  • credentials (AwsCredentials) (defaults to: AwsCredentials.default)

    an instance of AwsCredentials. Uses AwsCredentials::default if not provided.



31
32
33
34
# File 'lib/bisques/client.rb', line 31

def initialize(region, queue_prefix = nil, credentials = AwsCredentials.default)
  super(region, "sqs", credentials)
  @queue_prefix = queue_prefix
end

Instance Attribute Details

#queue_prefixString

to see queues whose name has this prefix.

Returns:

  • (String)

    The queue prefix when interacting with SQS. The client will only be able



22
23
24
# File 'lib/bisques/client.rb', line 22

def queue_prefix
  @queue_prefix
end

Instance Method Details

#change_message_visibility(queue_url, receipt_handle, visibility_timeout) ⇒ AwsResponse

Change the visibility of a message on the queue. This is useful if you have retrieved a message and now want to keep it hidden for longer before deleting it, or if you have a job and decide you cannot action it and want to return it to the queue sooner.

Parameters:

  • queue_url (String)
  • receipt_handle (String)
  • visibility_timeout (Fixnum)

Returns:

Raises:



218
219
220
# File 'lib/bisques/client.rb', line 218

def change_message_visibility(queue_url, receipt_handle, visibility_timeout)
  action("ChangeMessageVisibility", queue_url, {"ReceiptHandle" => receipt_handle, "VisibilityTimeout" => visibility_timeout})
end

#create_queue(name, attributes = {}) ⇒ Queue

Creates a new SQS queue and returns a Queue object.

Parameters:

  • name (String)
  • attributes (Hash) (defaults to: {})

Returns:

Raises:



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/bisques/client.rb', line 52

def create_queue(name, attributes = {})
  response = action("CreateQueue", {"QueueName" => Queue.sanitize_name("#{queue_prefix}#{name}")}.merge(attributes))

  if response.success?
    Queue.new(self, response.doc.xpath("//QueueUrl").text)
  else
    raise "Could not create queue #{name}"
  end

rescue AwsActionError => error
  if error.code == "AWS.SimpleQueueService.QueueDeletedRecently"
    raise QueueDeletedRecentlyError, error.message
  else
    raise error
  end
end

#delete_message(queue_url, receipt_handle) ⇒ AwsResponse

Delete a message from a queue. The message is deleted by the handle given when the message is retrieved.

Parameters:

  • queue_url (String)
  • receipt_handle (String)

Returns:

Raises:



194
195
196
# File 'lib/bisques/client.rb', line 194

def delete_message(queue_url, receipt_handle)
  action("DeleteMessage", queue_url, {"ReceiptHandle" => receipt_handle})
end

#delete_queue(queue_url) ⇒ AwsResponse

Deletes an SQS queue at a given path.

Parameters:

  • queue_url (String)

Returns:

Raises:



73
74
75
# File 'lib/bisques/client.rb', line 73

def delete_queue(queue_url)
  response = action("DeleteQueue", queue_url)
end

#get_or_create_queue(name) ⇒ Queue

Returns a Queue object representing an SQS queue, creating it if it does not already exist.

Parameters:

  • name (String)

Returns:

Raises:



42
43
44
# File 'lib/bisques/client.rb', line 42

def get_or_create_queue(name)
  get_queue(name) || create_queue(name, {})
end

#get_queue(name, options = {}) ⇒ Queue?

Get an SQS queue by name.

Parameters:

  • name (String)
  • options (Hash) (defaults to: {})

Returns:

  • (Queue, nil)

    Returns a Queue object if the queue is found, otherwise nil.

Raises:



82
83
84
85
86
87
88
89
90
91
# File 'lib/bisques/client.rb', line 82

def get_queue(name, options = {})
  response = action("GetQueueUrl", {"QueueName" => Queue.sanitize_name("#{queue_prefix}#{name}")}.merge(options))
  
  if response.success?
    Queue.new(self, response.doc.xpath("//QueueUrl").text)
  end

rescue Bisques::AwsActionError => e
  raise unless e.code == "AWS.SimpleQueueService.NonExistentQueue"
end

#get_queue_attributes(queue_url, attributes = ["All"]) ⇒ AwsResponse

Get the attributes for a queue. Takes an array of attribute names. Defaults to [“All”] which returns all the available attributes.

Parameters:

  • queue_url (String)
  • attributes (Array<String>) (defaults to: ["All"])

Returns:

Raises:



121
122
123
124
125
126
127
128
129
# File 'lib/bisques/client.rb', line 121

def get_queue_attributes(queue_url, attributes = ["All"])
  attributes = attributes.map(&:to_s)

  query = Hash[*attributes.each_with_index.map do |attribute, index|
    ["AttributeName.#{index+1}", attribute]
  end.flatten]

  action("GetQueueAttributes", queue_url, query)
end

#list_queues(prefix = "") ⇒ Array<Queue>

Return an array of Queue objects representing the queues found in SQS. An optional prefix can be supplied to restrict the queues found. This prefix is additional to the client prefix.

Examples:

Delete all the queues


client.list_queues.each do |queue|
  queue.delete
end

Parameters:

  • prefix (String) (defaults to: "")

    option prefix to restrict the queues found.

Returns:

  • (Array<Queue>)

    queue objects found.

Raises:



107
108
109
110
111
112
# File 'lib/bisques/client.rb', line 107

def list_queues(prefix = "")
  response = action("ListQueues", "QueueNamePrefix" => "#{queue_prefix}#{prefix}")
  response.doc.xpath("//ListQueuesResult/QueueUrl").map(&:text).map do |url|
    Queue.new(self, url)
  end
end

#receive_message(queue_url, options = {}) ⇒ AwsResponse

Receive a message from a queue. Takes the queue url and an optional hash.

Parameters:

  • queue_url (String)
  • options (Hash) (defaults to: {})

Returns:

Raises:



203
204
205
206
# File 'lib/bisques/client.rb', line 203

def receive_message(queue_url, options = {})
  # validate_options(options, %w(AttributeName MaxNumberOfMessages VisibilityTimeout WaitTimeSeconds))
  action("ReceiveMessage", queue_url, options)
end

#send_message(queue_url, message_body, delay_seconds = nil) ⇒ Object

Put a message on a queue. Takes the queue url and the message body, which should be a string. An optional delay seconds argument can be added if the message should not become visible immediately.

Examples:


client.send_message(queue.path, "test message")

Parameters:

  • queue_url (String)
  • message_body (String)
  • delay_seconds (Fixnum) (defaults to: nil)

Returns:

  • nil

Raises:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/bisques/client.rb', line 146

def send_message(queue_url, message_body, delay_seconds=nil)
  options = {"MessageBody" => message_body}
  options["DelaySeconds"] = delay_seconds if delay_seconds

  tries = 0
  md5 = Digest::MD5.hexdigest(message_body)

  begin
    tries += 1
    response = action("SendMessage", queue_url, options)
    
    returned_md5 = response.doc.xpath("//MD5OfMessageBody").text
    raise MessageHasWrongMd5Error.new(message_body, md5, returned_md5) unless md5 == returned_md5
  rescue MessageHasWrongMd5Error
    if tries < 2
      retry
    else
      raise
    end
  end
end

#send_message_batch(queue_url, messages) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/bisques/client.rb', line 168

def send_message_batch(queue_url, messages)
  options = {}

  messages.each_with_index do |message, index|
    id = nil, body = nil

    if message.is_a?(Array)
      id, body, _ = message
    else
      body = message
    end

    options["SendMessageBatchRequestEntry.#{index+1}.Id"] = id if id.present?
    options["SendMessageBatchRequestEntry.#{index+1}.MessageBody"] = body
  end

  action("SendMessageBatch", queue_url, options)
end