Class: Rufus::SQS::QueueService

Inherits:
Object
  • Object
show all
Defined in:
lib/rufus/sqs.rb

Overview

As the name implies.

Constant Summary collapse

AWS_VERSION =
"2006-04-01"
DEFAULT_QUEUE_HOST =
"queue.amazonaws.com"

Instance Method Summary collapse

Constructor Details

#initialize(queue_host = nil) ⇒ QueueService

Returns a new instance of QueueService.



96
97
98
99
# File 'lib/rufus/sqs.rb', line 96

def initialize (queue_host=nil)

    @queue_host = queue_host || DEFAULT_QUEUE_HOST
end

Instance Method Details

#create_queue(queue_name) ⇒ Object

Creates a queue.

If the queue name doesn’t comply with SQS requirements for it, an error will be raised.



128
129
130
131
132
133
134
135
# File 'lib/rufus/sqs.rb', line 128

def create_queue (queue_name)

    doc = do_action :post, @queue_host, "/?QueueName=#{queue_name}"

    doc.elements.each("//QueueUrl") do |e|
        return e.text.to_s
    end
end

#delete_message(queue, message_id) ⇒ Object

Deletes a given message.

The queue might be a queue name (String) or a Queue instance.



229
230
231
232
233
234
235
236
237
238
239
# File 'lib/rufus/sqs.rb', line 229

def delete_message (queue, message_id)

    queue = resolve_queue(queue)

    path = "#{queue.path}/#{message_id}"
    #path = "#{queue.path}/#{CGI::escape(message_id)}"

    doc = do_action :delete, queue.host, path

    SQS::get_element_text(doc, "//StatusCode") == "Success"
end

#delete_queue(queue, force = false) ⇒ Object

Deletes the queue. Returns true if the delete was successful. You can empty a queue by called the method #flush_queue

If ‘force’ is set to true, a flush will be performed on the queue before the actual delete operation. It should ensure a successful removal of the queue.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/rufus/sqs.rb', line 280

def delete_queue (queue, force=false)

    queue = resolve_queue(queue)

    flush_queue(queue) if force
    
    begin

        doc = do_action :delete, @queue_host, queue.path

    rescue Exception => e

        return false if e.message.match "^400 .*$"
    end

    SQS::get_element_text(doc, "//StatusCode") == "Success"
end

#flush_queue(queue) ⇒ Object

Use with care !

Attempts at deleting all the messages in a queue. Returns the total count of messages deleted.

A call on this method might take a certain time, as it has to delete each message individually. AWS will perhaps add a proper ‘flush_queue’ method later.

The queue might be a queue name (String) or a Queue instance.



253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/rufus/sqs.rb', line 253

def flush_queue (queue)

    count = 0

    loop do

        l = get_messages queue, :timeout => 0, :count => 255

        break if l.length < 1

        l.each do |m|
            m.delete
            count += 1
        end
    end

    count
end

#get_message(queue, message_id) ⇒ Object

Retrieves a single message from a queue. Returns an instance of Message.

The queue might be a queue name (String) or a Queue instance.



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/rufus/sqs.rb', line 208

def get_message (queue, message_id)

    queue = resolve_queue(queue)

    path = "#{queue.path}/#{message_id}"

    begin
        doc = do_action :get, queue.host, path
        Message.new(queue, doc.root.elements[1])
    rescue Exception => e
        #puts e.message
        return nil if e.message.match "^404 .*$"
        raise e
    end
end

#get_messages(queue, params = {}) ⇒ Object

Retrieves a bunch of messages from a queue. Returns a list of Message instances.

There are actually two optional params that this method understands :

  • :timeout the duration in seconds of the message visibility in the

    queue
    
  • :count the max number of message to be returned by this call

The queue might be a queue name (String) or a Queue instance.



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/rufus/sqs.rb', line 176

def get_messages (queue, params={})

    queue = resolve_queue(queue)

    path = "#{queue.path}/front"

    path += "?" if params.size > 0

    timeout = params[:timeout]
    count = params[:count]

    path += "VisibilityTimeout=#{timeout}" if timeout
    path += "&" if timeout and count
    path += "NumberOfMessages=#{count}" if count

    doc = do_action :get, queue.host, path

    messages = []

    doc.elements.each("//Message") do |me|
        messages << Message.new(queue, me)
    end

    messages
end

#get_queue(queue_name) ⇒ Object

Given a queue name, a Queue instance is returned.



301
302
303
304
305
306
307
308
309
310
311
# File 'lib/rufus/sqs.rb', line 301

def get_queue (queue_name)

    l = list_queues(queue_name)

    l.each do |q|
        return q if q.name == queue_name
    end

    #return nil
    raise "found no queue named '#{queue_name}'"
end

#list_queues(prefix = nil) ⇒ Object

Lists the queues for the active AWS account. If ‘prefix’ is given, only queues whose name begin with that prefix will be returned.



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/rufus/sqs.rb', line 106

def list_queues (prefix=nil)

    queues = []

    path = "/"
    path = "#{path}?QueueNamePrefix=#{prefix}" if prefix

    doc = do_action :get, @queue_host, path

    doc.elements.each("//QueueUrl") do |e|
        queues << Queue.new(self, e)
    end

    return queues
end

#put_message(queue, content) ⇒ Object Also known as: send_message

Given some content (‘text/plain’ content), send it as a message to a queue. Returns the SQS message id (a String).

The queue might be a queue name (String) or a Queue instance.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/rufus/sqs.rb', line 144

def put_message (queue, content)

    queue = resolve_queue(queue)

    doc = do_action :put, queue.host, "#{queue.path}/back", content

    #puts doc.to_s

    #status_code = SQS::get_element_text(doc, '//StatusCode')
    #message_id = SQS::get_element_text(doc, '//MessageId')
    #request_id = SQS::get_element_text(doc, '//RequestId')
    #{ :status_code => status_code, 
    #  :message_id => message_id, 
    #  :request_id => request_id }

    SQS::get_element_text(doc, '//MessageId')
end