Class: ActiveMessaging::Adapters::AmazonSqs::Connection
- Inherits:
-
BaseConnection
- Object
- BaseConnection
- ActiveMessaging::Adapters::AmazonSqs::Connection
- Defined in:
- lib/activemessaging/adapters/asqs.rb
Constant Summary collapse
- QUEUE_NAME_LENGTH =
1..80
- VISIBILITY_TIMEOUT =
0..(24 * 60 * 60)
- NUMBER_OF_MESSAGES =
1..255
- GET_QUEUE_ATTRIBUTES =
['All', 'ApproximateNumberOfMessages', 'VisibilityTimeout']
- SET_QUEUE_ATTRIBUTES =
['VisibilityTimeout']
Instance Attribute Summary collapse
-
#access_key_id ⇒ Object
configurable params.
-
#aws_version ⇒ Object
configurable params.
-
#cache_queue_list ⇒ Object
configurable params.
-
#content_type ⇒ Object
configurable params.
-
#host ⇒ Object
configurable params.
-
#max_message_size ⇒ Object
configurable params.
-
#poll_interval ⇒ Object
configurable params.
-
#port ⇒ Object
configurable params.
-
#reconnect_delay ⇒ Object
configurable params.
-
#secret_access_key ⇒ Object
configurable params.
Attributes inherited from BaseConnection
Instance Method Summary collapse
- #disconnect ⇒ Object
-
#initialize(cfg) ⇒ Connection
constructor
generic init method needed by a13g.
-
#receive(options = {}) ⇒ Object
new receive respects priorities.
-
#received(message, headers = {}) ⇒ Object
# receive a single message from any of the subscribed queues # check each queue once, then sleep for poll_interval def receive raise “No subscriptions to receive messages from.” if (@subscriptions.nil? || @subscriptions.empty?) start = @current_subscription while true # puts “calling receive…” @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0) sleep poll_interval if (@current_subscription == start) queue_name = @subscriptions.keys.sort queue = queues subscription = @subscriptions unless queue.nil? messages = retrieve_messsages queue, 1, subscription.headers return messages unless (messages.nil? or messages.empty? or messages.nil?) end end end.
-
#send(queue_name, message_body, message_headers = {}) ⇒ Object
queue_name string, body string, headers hash send a single message to a queue.
-
#subscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues.
-
#unreceive(message, headers = {}) ⇒ Object
do nothing; by not deleting the message will eventually become visible again.
-
#unsubscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok.
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg) ⇒ Connection
generic init method needed by a13g
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/activemessaging/adapters/asqs.rb', line 30 def initialize cfg raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?) raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?) @access_key_id=cfg[:access_key_id] @secret_access_key=cfg[:secret_access_key] @request_expires = cfg[:requestExpires] || 10 @request_retry_count = cfg[:requestRetryCount] || 5 @aws_version = cfg[:aws_version] || '2008-01-01' @content_type = cfg[:content_type] || 'text/plain' @host = cfg[:host] || 'queue.amazonaws.com' @port = cfg[:port] || 80 @protocol = cfg[:protocol] || 'http' @poll_interval = cfg[:poll_interval] || 1 @reconnect_delay = cfg[:reconnectDelay] || 5 @max_message_size = cfg[:max_message_size].to_i > 0 ? cfg[:max_message_size].to_i : 8 @aws_url="#{@protocol}://#{@host}" @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list] @reliable = cfg[:reliable].nil? ? true : cfg[:reliable] #initialize the subscriptions and queues @subscriptions = {} @queues_by_priority = {} @current_subscription = 0 queues end |
Instance Attribute Details
#access_key_id ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def access_key_id @access_key_id end |
#aws_version ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def aws_version @aws_version end |
#cache_queue_list ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def cache_queue_list @cache_queue_list end |
#content_type ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def content_type @content_type end |
#host ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def host @host end |
#max_message_size ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def @max_message_size end |
#poll_interval ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def poll_interval @poll_interval end |
#port ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def port @port end |
#reconnect_delay ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def reconnect_delay @reconnect_delay end |
#secret_access_key ⇒ Object
configurable params
27 28 29 |
# File 'lib/activemessaging/adapters/asqs.rb', line 27 def secret_access_key @secret_access_key end |
Instance Method Details
#disconnect ⇒ Object
60 61 62 63 |
# File 'lib/activemessaging/adapters/asqs.rb', line 60 def disconnect #it's an http request - there is no disconnect - ha! return true end |
#receive(options = {}) ⇒ Object
new receive respects priorities
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/activemessaging/adapters/asqs.rb', line 115 def receive(={}) = nil only_priorities = [:priorities] # loop through the priorities @queues_by_priority.keys.sort.each do |priority| # skip this priority if there is a list, and it is not in the list next if only_priorities && !only_priorities.include?(priority.to_i) # puts " - priority: #{priority}" # loop through queues for the priority in random order each time @queues_by_priority[priority].shuffle.each do |queue_name| # puts " - queue_name: #{queue_name}" queue = queues[queue_name] subscription = @subscriptions[queue_name] next if queue.nil? || subscription.nil? = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout]) if ( && !.empty?) = [0] end break if end break if end # puts " - message: #{message}" end |
#received(message, headers = {}) ⇒ Object
# receive a single message from any of the subscribed queues # check each queue once, then sleep for poll_interval def receive
raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?)
start = @current_subscription
while true
# puts "calling receive..."
@current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0)
sleep poll_interval if (@current_subscription == start)
queue_name = @subscriptions.keys.sort[@current_subscription]
queue = queues[queue_name]
subscription = @subscriptions[queue_name]
unless queue.nil?
= retrieve_messsages queue, 1, subscription.headers[:visibility_timeout]
return [0] unless (.nil? or .empty? or [0].nil?)
end
end
end
169 170 171 172 173 174 175 176 |
# File 'lib/activemessaging/adapters/asqs.rb', line 169 def received , headers={} begin rescue Object=>exception logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: " logger.error exception end end |
#send(queue_name, message_body, message_headers = {}) ⇒ Object
queue_name string, body string, headers hash send a single message to a queue
95 96 97 98 |
# File 'lib/activemessaging/adapters/asqs.rb', line 95 def send queue_name, , ={} queue = get_or_create_queue queue_name send_messsage queue, end |
#subscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues
67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/activemessaging/adapters/asqs.rb', line 67 def subscribe queue_name, ={} # look at the existing queues, create any that are missing queue = get_or_create_queue queue_name if @subscriptions.has_key? queue.name @subscriptions[queue.name].add else @subscriptions[queue.name] = Subscription.new(queue.name, ) end priority = @subscriptions[queue.name].priority @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority) @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name) end |
#unreceive(message, headers = {}) ⇒ Object
do nothing; by not deleting the message will eventually become visible again
179 180 181 |
# File 'lib/activemessaging/adapters/asqs.rb', line 179 def unreceive , headers={} return true end |
#unsubscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok
83 84 85 86 87 88 89 90 91 |
# File 'lib/activemessaging/adapters/asqs.rb', line 83 def unsubscribe queue_name, ={} if @subscriptions[queue_name] @subscriptions[queue_name].remove if @subscriptions[queue_name].count <= 0 sub = @subscriptions.delete(queue_name) @queues_by_priority[sub.priority].delete(queue_name) end end end |