Class: WAZ::Queues::Service

Inherits:
Object
  • Object
show all
Includes:
Storage::SharedKeyCoreService
Defined in:
lib/waz/queues/service.rb

Instance Attribute Summary

Attributes included from Storage::SharedKeyCoreService

#account_key, #account_name, #base_url, #use_ssl

Instance Method Summary collapse

Methods included from Storage::SharedKeyCoreService

#canonicalize_headers, #canonicalize_message, #generate_request, #generate_request_uri, #generate_signature, #initialize

Instance Method Details

#clear_queue(queue_name) ⇒ Object



88
89
90
91
92
# File 'lib/waz/queues/service.rb', line 88

def clear_queue(queue_name)
  url = generate_request_uri(nil, "#{queue_name}/messages")
  request = generate_request("DELETE", url)
  request.execute()
end

#create_queue(queue_name, metadata = {}) ⇒ Object



18
19
20
21
22
23
24
25
26
# File 'lib/waz/queues/service.rb', line 18

def create_queue(queue_name,  = {})
  begin
    url = generate_request_uri(nil, queue_name)
    request = generate_request("PUT", url, )
    request.execute()
  rescue RestClient::RequestFailed
    raise WAZ::Queues::QueueAlreadyExists, queue_name if $!.http_code == 409
  end
end

#delete_message(queue_name, message_id, pop_receipt) ⇒ Object



82
83
84
85
86
# File 'lib/waz/queues/service.rb', line 82

def delete_message(queue_name, message_id, pop_receipt)
  url = generate_request_uri(nil, "#{queue_name}/messages/#{message_id}", :pop_receipt => pop_receipt)
  request = generate_request("DELETE", url)
  request.execute()
end

#delete_queue(queue_name) ⇒ Object



28
29
30
31
32
# File 'lib/waz/queues/service.rb', line 28

def delete_queue(queue_name)
  url = generate_request_uri(nil, queue_name)
  request = generate_request("DELETE", url)
  request.execute()
end

#enqueue(queue_name, message_payload, ttl = 604800) ⇒ Object

ttl Specifies the time-to-live interval for the message, in seconds. The maximum time-to-live allowed is 7 days. If this parameter is omitted, the default time-to-live is 7 days.



48
49
50
51
52
53
# File 'lib/waz/queues/service.rb', line 48

def enqueue(queue_name, message_payload, ttl = 604800)
  url = generate_request_uri(nil, "#{queue_name}/messages", "messagettl" => ttl)
  payload = "<?xml version=\"1.0\" encoding=\"utf-8\"?><QueueMessage><MessageText>#{message_payload}</MessageText></QueueMessage>"
  request = generate_request("POST", url, { "Content-Type" => "application/xml" }, payload)
  request.execute()
end

#get_messages(queue_name, options = {}) ⇒ Object

:num_of_messages option specifies the max number of messages to get (maximum 32) :visibility_timeout option specifies the timeout of the message locking in seconds (max two hours)



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/waz/queues/service.rb', line 57

def get_messages(queue_name, options = {})
  raise WAZ::Queues::OptionOutOfRange, {:name => :num_of_messages, :min => 1, :max => 32} if (options.keys.include?(:num_of_messages) && (options[:num_of_messages].to_i < 1 || options[:num_of_messages].to_i > 32))
  raise WAZ::Queues::OptionOutOfRange, {:name => :visibility_timeout, :min => 1, :max => 7200} if (options.keys.include?(:visibility_timeout) && (options[:visibility_timeout].to_i < 1 || options[:visibility_timeout].to_i > 7200))
  url = generate_request_uri(nil, "#{queue_name}/messages", options)
  request = generate_request("GET", url)
  doc = REXML::Document.new(request.execute())
  messages = []
  REXML::XPath.each(doc, '//QueueMessage/') do |item|
    message = { :message_id => REXML::XPath.first(item, "MessageId").text,
                :message_text => REXML::XPath.first(item, "MessageText").text,
                :expiration_time => Time.httpdate(REXML::XPath.first(item, "ExpirationTime").text),
                :insertion_time => Time.httpdate(REXML::XPath.first(item, "InsertionTime").text) }

    # This are only valid when peek-locking messages
    message[:pop_receipt] = REXML::XPath.first(item, "PopReceipt").text unless REXML::XPath.first(item, "PopReceipt").nil?
    message[:time_next_visible] = Time.httpdate(REXML::XPath.first(item, "TimeNextVisible").text) unless REXML::XPath.first(item, "TimeNextVisible").nil?
    messages << message
  end
  return messages
end

#get_queue_metadata(queue_name) ⇒ Object



34
35
36
37
38
# File 'lib/waz/queues/service.rb', line 34

def (queue_name)
  url = generate_request_uri("metadata", queue_name)
  request = generate_request("HEAD", url)
  request.execute().headers
end

#list_queues(options = {}) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
# File 'lib/waz/queues/service.rb', line 6

def list_queues(options ={})
  url = generate_request_uri("list", nil)
  request = generate_request("GET", url)
  doc = REXML::Document.new(request.execute())
  queues = []
   REXML::XPath.each(doc, '//Queue/') do |item|
      queues << { :name => REXML::XPath.first(item, "QueueName").text,
                  :url => REXML::XPath.first(item, "Url").text }
    end
  return queues
end

#peek(queue_name, options = {}) ⇒ Object



78
79
80
# File 'lib/waz/queues/service.rb', line 78

def peek(queue_name, options = {})
  return get_messages(queue_name, {:peek_only => true}.merge(options))
end

#set_queue_metadata(queue_name, metadata = {}) ⇒ Object



40
41
42
43
44
# File 'lib/waz/queues/service.rb', line 40

def (queue_name,  = {})
  url = generate_request_uri("metadata", queue_name)
  request = generate_request("PUT", url, )
  request.execute()
end