Class: ActiveMessaging::Adapters::AmazonSqs::Connection
- Inherits:
-
BaseConnection
- Object
- BaseConnection
- ActiveMessaging::Adapters::AmazonSqs::Connection
- Defined in:
- lib/activemessaging/adapters/asqs.rb
Constant Summary collapse
- QUEUE_NAME_LENGTH =
1..80
- MESSAGE_SIZE =
1..(8 * 1024)
- VISIBILITY_TIMEOUT =
0..(24 * 60 * 60)
- NUMBER_OF_MESSAGES =
1..255
- GET_QUEUE_ATTRIBUTES =
['All', 'ApproximateNumberOfMessages', 'VisibilityTimeout']
- SET_QUEUE_ATTRIBUTES =
['VisibilityTimeout']
Instance Attribute Summary collapse
-
#access_key_id ⇒ Object
configurable params.
-
#aws_version ⇒ Object
configurable params.
-
#cache_queue_list ⇒ Object
configurable params.
-
#content_type ⇒ Object
configurable params.
-
#host ⇒ Object
configurable params.
-
#poll_interval ⇒ Object
configurable params.
-
#port ⇒ Object
configurable params.
-
#reconnectDelay ⇒ Object
configurable params.
-
#secret_access_key ⇒ Object
configurable params.
Attributes inherited from BaseConnection
Instance Method Summary collapse
- #disconnect ⇒ Object
-
#initialize(cfg) ⇒ Connection
constructor
generic init method needed by a13g.
-
#receive(options = {}) ⇒ Object
new receive respects priorities.
-
#received(message, headers = {}) ⇒ Object
# receive a single message from any of the subscribed queues # check each queue once, then sleep for poll_interval def receive raise “No subscriptions to receive messages from.” if (@subscriptions.nil? || @subscriptions.empty?) start = @current_subscription while true # puts “calling receive…” @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0) sleep poll_interval if (@current_subscription == start) queue_name = @subscriptions.keys.sort queue = queues subscription = @subscriptions unless queue.nil? messages = retrieve_messsages queue, 1, subscription.headers return messages unless (messages.nil? or messages.empty? or messages.nil?) end end end.
-
#send(queue_name, message_body, message_headers = {}) ⇒ Object
queue_name string, body string, headers hash send a single message to a queue.
-
#subscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues.
-
#unreceive(message, headers = {}) ⇒ Object
do nothing; by not deleting the message will eventually become visible again.
-
#unsubscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok.
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg) ⇒ Connection
generic init method needed by a13g
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/activemessaging/adapters/asqs.rb', line 31 def initialize cfg raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?) raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?) @access_key_id=cfg[:access_key_id] @secret_access_key=cfg[:secret_access_key] @request_expires = cfg[:requestExpires] || 10 @request_retry_count = cfg[:requestRetryCount] || 5 @aws_version = cfg[:aws_version] || '2008-01-01' @content_type = cfg[:content_type] || 'text/plain' @host = cfg[:host] || 'queue.amazonaws.com' @port = cfg[:port] || 80 @protocol = cfg[:protocol] || 'http' @poll_interval = cfg[:poll_interval] || 1 @reconnect_delay = cfg[:reconnectDelay] || 5 @aws_url="#{@protocol}://#{@host}" @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list] @reliable = cfg[:reliable].nil? ? true : cfg[:reliable] #initialize the subscriptions and queues @subscriptions = {} @queues_by_priority = {} @current_subscription = 0 queues end |
Instance Attribute Details
#access_key_id ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def access_key_id @access_key_id end |
#aws_version ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def aws_version @aws_version end |
#cache_queue_list ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def cache_queue_list @cache_queue_list end |
#content_type ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def content_type @content_type end |
#host ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def host @host end |
#poll_interval ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def poll_interval @poll_interval end |
#port ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def port @port end |
#reconnectDelay ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def reconnectDelay @reconnectDelay end |
#secret_access_key ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def secret_access_key @secret_access_key end |
Instance Method Details
#disconnect ⇒ Object
58 59 60 61 |
# File 'lib/activemessaging/adapters/asqs.rb', line 58 def disconnect #it's an http request - there is no disconnect - ha! return true end |
#receive(options = {}) ⇒ Object
new receive respects priorities
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/activemessaging/adapters/asqs.rb', line 113 def receive(={}) = nil only_priorities = [:priorities] # loop through the priorities @queues_by_priority.keys.sort.each do |priority| # skip this priority if there is a list, and it is not in the list next if only_priorities && !only_priorities.include?(priority.to_i) # puts " - priority: #{priority}" # loop through queues for the priority in random order each time @queues_by_priority[priority].shuffle.each do |queue_name| # puts " - queue_name: #{queue_name}" queue = queues[queue_name] subscription = @subscriptions[queue_name] next if queue.nil? || subscription.nil? = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout]) if ( && !.empty?) = [0] end break if end break if end # puts " - message: #{message}" end |
#received(message, headers = {}) ⇒ Object
# receive a single message from any of the subscribed queues # check each queue once, then sleep for poll_interval def receive
raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?)
start = @current_subscription
while true
# puts "calling receive..."
@current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0)
sleep poll_interval if (@current_subscription == start)
queue_name = @subscriptions.keys.sort[@current_subscription]
queue = queues[queue_name]
subscription = @subscriptions[queue_name]
unless queue.nil?
= retrieve_messsages queue, 1, subscription.headers[:visibility_timeout]
return [0] unless (.nil? or .empty? or [0].nil?)
end
end
end
167 168 169 170 171 172 173 174 |
# File 'lib/activemessaging/adapters/asqs.rb', line 167 def received , headers={} begin rescue Object=>exception logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: " logger.error exception end end |
#send(queue_name, message_body, message_headers = {}) ⇒ Object
queue_name string, body string, headers hash send a single message to a queue
93 94 95 96 |
# File 'lib/activemessaging/adapters/asqs.rb', line 93 def send queue_name, , ={} queue = get_or_create_queue queue_name send_messsage queue, end |
#subscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/activemessaging/adapters/asqs.rb', line 65 def subscribe queue_name, ={} # look at the existing queues, create any that are missing queue = get_or_create_queue queue_name if @subscriptions.has_key? queue.name @subscriptions[queue.name].add else @subscriptions[queue.name] = Subscription.new(queue.name, ) end priority = @subscriptions[queue.name].priority @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority) @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name) end |
#unreceive(message, headers = {}) ⇒ Object
do nothing; by not deleting the message will eventually become visible again
177 178 179 |
# File 'lib/activemessaging/adapters/asqs.rb', line 177 def unreceive , headers={} return true end |
#unsubscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok
81 82 83 84 85 86 87 88 89 |
# File 'lib/activemessaging/adapters/asqs.rb', line 81 def unsubscribe queue_name, ={} if @subscriptions[queue_name] @subscriptions[queue_name].remove if @subscriptions[queue_name].count <= 0 sub = @subscriptions.delete(queue_name) @queues_by_priority[sub.priority].delete(queue_name) end end end |