Class: WAZ::Queues::Service
- Inherits:
-
Object
- Object
- WAZ::Queues::Service
- Includes:
- Storage::SharedKeyCoreService
- Defined in:
- lib/waz/queues/service.rb
Instance Attribute Summary
Attributes included from Storage::SharedKeyCoreService
#account_key, #account_name, #base_url, #use_ssl
Instance Method Summary collapse
- #clear_queue(queue_name) ⇒ Object
- #create_queue(queue_name, metadata = {}) ⇒ Object
- #delete_message(queue_name, message_id, pop_receipt) ⇒ Object
- #delete_queue(queue_name) ⇒ Object
-
#enqueue(queue_name, message_payload, ttl = 604800) ⇒ Object
ttl Specifies the time-to-live interval for the message, in seconds.
-
#get_messages(queue_name, options = {}) ⇒ Object
:num_of_messages option specifies the max number of messages to get (maximum 32) :visibility_timeout option specifies the timeout of the message locking in seconds (max two hours).
- #get_queue_metadata(queue_name) ⇒ Object
- #list_queues(options = {}) ⇒ Object
- #peek(queue_name, options = {}) ⇒ Object
- #set_queue_metadata(queue_name, metadata = {}) ⇒ Object
Methods included from Storage::SharedKeyCoreService
#canonicalize_headers, #canonicalize_message, #generate_request, #generate_request_uri, #generate_signature, #initialize
Instance Method Details
#clear_queue(queue_name) ⇒ Object
88 89 90 91 92 |
# File 'lib/waz/queues/service.rb', line 88 def clear_queue(queue_name) url = generate_request_uri(nil, "#{queue_name}/messages") request = generate_request("DELETE", url) request.execute() end |
#create_queue(queue_name, metadata = {}) ⇒ Object
18 19 20 21 22 23 24 25 26 |
# File 'lib/waz/queues/service.rb', line 18 def create_queue(queue_name, = {}) begin url = generate_request_uri(nil, queue_name) request = generate_request("PUT", url, ) request.execute() rescue RestClient::RequestFailed raise WAZ::Queues::QueueAlreadyExists, queue_name if $!.http_code == 409 end end |
#delete_message(queue_name, message_id, pop_receipt) ⇒ Object
82 83 84 85 86 |
# File 'lib/waz/queues/service.rb', line 82 def (queue_name, , pop_receipt) url = generate_request_uri(nil, "#{queue_name}/messages/#{message_id}", :pop_receipt => pop_receipt) request = generate_request("DELETE", url) request.execute() end |
#delete_queue(queue_name) ⇒ Object
28 29 30 31 32 |
# File 'lib/waz/queues/service.rb', line 28 def delete_queue(queue_name) url = generate_request_uri(nil, queue_name) request = generate_request("DELETE", url) request.execute() end |
#enqueue(queue_name, message_payload, ttl = 604800) ⇒ Object
ttl Specifies the time-to-live interval for the message, in seconds. The maximum time-to-live allowed is 7 days. If this parameter is omitted, the default time-to-live is 7 days.
48 49 50 51 52 53 |
# File 'lib/waz/queues/service.rb', line 48 def enqueue(queue_name, , ttl = 604800) url = generate_request_uri(nil, "#{queue_name}/messages", "messagettl" => ttl) payload = "<?xml version=\"1.0\" encoding=\"utf-8\"?><QueueMessage><MessageText>#{message_payload}</MessageText></QueueMessage>" request = generate_request("POST", url, { "Content-Type" => "application/xml" }, payload) request.execute() end |
#get_messages(queue_name, options = {}) ⇒ Object
:num_of_messages option specifies the max number of messages to get (maximum 32) :visibility_timeout option specifies the timeout of the message locking in seconds (max two hours)
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/waz/queues/service.rb', line 57 def (queue_name, = {}) raise WAZ::Queues::OptionOutOfRange, {:name => :num_of_messages, :min => 1, :max => 32} if (.keys.include?(:num_of_messages) && ([:num_of_messages].to_i < 1 || [:num_of_messages].to_i > 32)) raise WAZ::Queues::OptionOutOfRange, {:name => :visibility_timeout, :min => 1, :max => 7200} if (.keys.include?(:visibility_timeout) && ([:visibility_timeout].to_i < 1 || [:visibility_timeout].to_i > 7200)) url = generate_request_uri(nil, "#{queue_name}/messages", ) request = generate_request("GET", url) doc = REXML::Document.new(request.execute()) = [] REXML::XPath.each(doc, '//QueueMessage/') do |item| = { :message_id => REXML::XPath.first(item, "MessageId").text, :message_text => REXML::XPath.first(item, "MessageText").text, :expiration_time => Time.httpdate(REXML::XPath.first(item, "ExpirationTime").text), :insertion_time => Time.httpdate(REXML::XPath.first(item, "InsertionTime").text) } # This are only valid when peek-locking messages [:pop_receipt] = REXML::XPath.first(item, "PopReceipt").text unless REXML::XPath.first(item, "PopReceipt").nil? [:time_next_visible] = Time.httpdate(REXML::XPath.first(item, "TimeNextVisible").text) unless REXML::XPath.first(item, "TimeNextVisible").nil? << end return end |
#get_queue_metadata(queue_name) ⇒ Object
34 35 36 37 38 |
# File 'lib/waz/queues/service.rb', line 34 def (queue_name) url = generate_request_uri("metadata", queue_name) request = generate_request("HEAD", url) request.execute().headers end |
#list_queues(options = {}) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 |
# File 'lib/waz/queues/service.rb', line 6 def list_queues( ={}) url = generate_request_uri("list", nil) request = generate_request("GET", url) doc = REXML::Document.new(request.execute()) queues = [] REXML::XPath.each(doc, '//Queue/') do |item| queues << { :name => REXML::XPath.first(item, "QueueName").text, :url => REXML::XPath.first(item, "Url").text } end return queues end |
#peek(queue_name, options = {}) ⇒ Object
78 79 80 |
# File 'lib/waz/queues/service.rb', line 78 def peek(queue_name, = {}) return (queue_name, {:peek_only => true}.merge()) end |
#set_queue_metadata(queue_name, metadata = {}) ⇒ Object
40 41 42 43 44 |
# File 'lib/waz/queues/service.rb', line 40 def (queue_name, = {}) url = generate_request_uri("metadata", queue_name) request = generate_request("PUT", url, ) request.execute() end |