Class: Rufus::SQS::QueueService
- Inherits:
-
Object
- Object
- Rufus::SQS::QueueService
- Defined in:
- lib/rufus/sqs.rb
Overview
As the name implies.
Constant Summary collapse
- AWS_VERSION =
"2006-04-01"
- DEFAULT_QUEUE_HOST =
"queue.amazonaws.com"
Instance Method Summary collapse
-
#create_queue(queue_name) ⇒ Object
Creates a queue.
-
#delete_message(queue, message_id) ⇒ Object
Deletes a given message.
-
#delete_queue(queue, force = false) ⇒ Object
Deletes the queue.
-
#flush_queue(queue) ⇒ Object
Use with care !.
-
#get_message(queue, message_id) ⇒ Object
Retrieves a single message from a queue.
-
#get_messages(queue, params = {}) ⇒ Object
Retrieves a bunch of messages from a queue.
-
#get_queue(queue_name) ⇒ Object
Given a queue name, a Queue instance is returned.
-
#initialize(queue_host = nil) ⇒ QueueService
constructor
A new instance of QueueService.
-
#list_queues(prefix = nil) ⇒ Object
Lists the queues for the active AWS account.
-
#put_message(queue, content) ⇒ Object
(also: #send_message)
Given some content (‘text/plain’ content), send it as a message to a queue.
Constructor Details
#initialize(queue_host = nil) ⇒ QueueService
Returns a new instance of QueueService.
96 97 98 99 |
# File 'lib/rufus/sqs.rb', line 96 def initialize (queue_host=nil) @queue_host = queue_host || DEFAULT_QUEUE_HOST end |
Instance Method Details
#create_queue(queue_name) ⇒ Object
Creates a queue.
If the queue name doesn’t comply with SQS requirements for it, an error will be raised.
128 129 130 131 132 133 134 135 |
# File 'lib/rufus/sqs.rb', line 128 def create_queue (queue_name) doc = do_action :post, @queue_host, "/?QueueName=#{queue_name}" doc.elements.each("//QueueUrl") do |e| return e.text.to_s end end |
#delete_message(queue, message_id) ⇒ Object
Deletes a given message.
The queue might be a queue name (String) or a Queue instance.
229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/rufus/sqs.rb', line 229 def (queue, ) queue = resolve_queue(queue) path = "#{queue.path}/#{}" #path = "#{queue.path}/#{CGI::escape(message_id)}" doc = do_action :delete, queue.host, path SQS::get_element_text(doc, "//StatusCode") == "Success" end |
#delete_queue(queue, force = false) ⇒ Object
Deletes the queue. Returns true if the delete was successful. You can empty a queue by called the method #flush_queue
If ‘force’ is set to true, a flush will be performed on the queue before the actual delete operation. It should ensure a successful removal of the queue.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/rufus/sqs.rb', line 280 def delete_queue (queue, force=false) queue = resolve_queue(queue) flush_queue(queue) if force begin doc = do_action :delete, @queue_host, queue.path rescue Exception => e return false if e..match "^400 .*$" end SQS::get_element_text(doc, "//StatusCode") == "Success" end |
#flush_queue(queue) ⇒ Object
Use with care !
Attempts at deleting all the messages in a queue. Returns the total count of messages deleted.
A call on this method might take a certain time, as it has to delete each message individually. AWS will perhaps add a proper ‘flush_queue’ method later.
The queue might be a queue name (String) or a Queue instance.
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/rufus/sqs.rb', line 253 def flush_queue (queue) count = 0 loop do l = queue, :timeout => 0, :count => 255 break if l.length < 1 l.each do |m| m.delete count += 1 end end count end |
#get_message(queue, message_id) ⇒ Object
Retrieves a single message from a queue. Returns an instance of Message.
The queue might be a queue name (String) or a Queue instance.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/rufus/sqs.rb', line 208 def (queue, ) queue = resolve_queue(queue) path = "#{queue.path}/#{}" begin doc = do_action :get, queue.host, path Message.new(queue, doc.root.elements[1]) rescue Exception => e #puts e.message return nil if e..match "^404 .*$" raise e end end |
#get_messages(queue, params = {}) ⇒ Object
Retrieves a bunch of messages from a queue. Returns a list of Message instances.
There are actually two optional params that this method understands :
-
:timeout the duration in seconds of the message visibility in the
queue
-
:count the max number of message to be returned by this call
The queue might be a queue name (String) or a Queue instance.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/rufus/sqs.rb', line 176 def (queue, params={}) queue = resolve_queue(queue) path = "#{queue.path}/front" path += "?" if params.size > 0 timeout = params[:timeout] count = params[:count] path += "VisibilityTimeout=#{timeout}" if timeout path += "&" if timeout and count path += "NumberOfMessages=#{count}" if count doc = do_action :get, queue.host, path = [] doc.elements.each("//Message") do |me| << Message.new(queue, me) end end |
#get_queue(queue_name) ⇒ Object
Given a queue name, a Queue instance is returned.
301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/rufus/sqs.rb', line 301 def get_queue (queue_name) l = list_queues(queue_name) l.each do |q| return q if q.name == queue_name end #return nil raise "found no queue named '#{queue_name}'" end |
#list_queues(prefix = nil) ⇒ Object
Lists the queues for the active AWS account. If ‘prefix’ is given, only queues whose name begin with that prefix will be returned.
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/rufus/sqs.rb', line 106 def list_queues (prefix=nil) queues = [] path = "/" path = "#{path}?QueueNamePrefix=#{prefix}" if prefix doc = do_action :get, @queue_host, path doc.elements.each("//QueueUrl") do |e| queues << Queue.new(self, e) end return queues end |
#put_message(queue, content) ⇒ Object Also known as: send_message
Given some content (‘text/plain’ content), send it as a message to a queue. Returns the SQS message id (a String).
The queue might be a queue name (String) or a Queue instance.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/rufus/sqs.rb', line 144 def (queue, content) queue = resolve_queue(queue) doc = do_action :put, queue.host, "#{queue.path}/back", content #puts doc.to_s #status_code = SQS::get_element_text(doc, '//StatusCode') #message_id = SQS::get_element_text(doc, '//MessageId') #request_id = SQS::get_element_text(doc, '//RequestId') #{ :status_code => status_code, # :message_id => message_id, # :request_id => request_id } SQS::get_element_text(doc, '//MessageId') end |