Class: ActiveMessaging::Adapters::AmazonSqs::Connection
- Inherits:
-
BaseConnection
- Object
- BaseConnection
- ActiveMessaging::Adapters::AmazonSqs::Connection
- Defined in:
- lib/activemessaging/adapters/asqs.rb
Constant Summary collapse
- QUEUE_NAME_LENGTH =
1..80
- MESSAGE_SIZE =
1..(8 * 1024)
- VISIBILITY_TIMEOUT =
0..(24 * 60 * 60)
- NUMBER_OF_MESSAGES =
1..255
- GET_QUEUE_ATTRIBUTES =
['All', 'ApproximateNumberOfMessages', 'VisibilityTimeout']
- SET_QUEUE_ATTRIBUTES =
['VisibilityTimeout']
Instance Attribute Summary collapse
-
#access_key_id ⇒ Object
configurable params.
-
#aws_version ⇒ Object
configurable params.
-
#cache_queue_list ⇒ Object
configurable params.
-
#content_type ⇒ Object
configurable params.
-
#host ⇒ Object
configurable params.
-
#poll_interval ⇒ Object
configurable params.
-
#port ⇒ Object
configurable params.
-
#reconnectDelay ⇒ Object
configurable params.
-
#secret_access_key ⇒ Object
configurable params.
Attributes inherited from BaseConnection
Instance Method Summary collapse
- #disconnect ⇒ Object
-
#initialize(cfg) ⇒ Connection
constructor
generic init method needed by a13g.
-
#receive ⇒ Object
receive a single message from any of the subscribed queues check each queue once, then sleep for poll_interval.
- #received(message, headers = {}) ⇒ Object
-
#send(queue_name, message_body, message_headers = {}) ⇒ Object
queue_name string, body string, headers hash send a single message to a queue.
-
#subscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues.
-
#unreceive(message, headers = {}) ⇒ Object
do nothing; by not deleting the message will eventually become visible again.
-
#unsubscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok.
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg) ⇒ Connection
generic init method needed by a13g
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/activemessaging/adapters/asqs.rb', line 31 def initialize cfg raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?) raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?) @access_key_id=cfg[:access_key_id] @secret_access_key=cfg[:secret_access_key] @request_expires = cfg[:requestExpires] || 10 @request_retry_count = cfg[:requestRetryCount] || 5 @aws_version = cfg[:aws_version] || '2008-01-01' @content_type = cfg[:content_type] || 'text/plain' @host = cfg[:host] || 'queue.amazonaws.com' @port = cfg[:port] || 80 @protocol = cfg[:protocol] || 'http' @poll_interval = cfg[:poll_interval] || 1 @reconnect_delay = cfg[:reconnectDelay] || 5 @aws_url="#{@protocol}://#{@host}" @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list] @reliable = cfg[:reliable].nil? ? true : cfg[:reliable] #initialize the subscriptions and queues @subscriptions = {} @current_subscription = 0 queues end |
Instance Attribute Details
#access_key_id ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def access_key_id @access_key_id end |
#aws_version ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def aws_version @aws_version end |
#cache_queue_list ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def cache_queue_list @cache_queue_list end |
#content_type ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def content_type @content_type end |
#host ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def host @host end |
#poll_interval ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def poll_interval @poll_interval end |
#port ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def port @port end |
#reconnectDelay ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def reconnectDelay @reconnectDelay end |
#secret_access_key ⇒ Object
configurable params
28 29 30 |
# File 'lib/activemessaging/adapters/asqs.rb', line 28 def secret_access_key @secret_access_key end |
Instance Method Details
#disconnect ⇒ Object
57 58 59 60 |
# File 'lib/activemessaging/adapters/asqs.rb', line 57 def disconnect #it's an http request - there is no disconnect - ha! return true end |
#receive ⇒ Object
receive a single message from any of the subscribed queues check each queue once, then sleep for poll_interval
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/activemessaging/adapters/asqs.rb', line 92 def receive raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?) start = @current_subscription while true # puts "calling receive..." @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0) sleep poll_interval if (@current_subscription == start) queue_name = @subscriptions.keys.sort[@current_subscription] queue = queues[queue_name] subscription = @subscriptions[queue_name] unless queue.nil? = retrieve_messsages queue, 1, subscription.headers[:visibility_timeout] return [0] unless (.nil? or .empty? or [0].nil?) end end end |
#received(message, headers = {}) ⇒ Object
109 110 111 112 113 114 115 116 |
# File 'lib/activemessaging/adapters/asqs.rb', line 109 def received , headers={} begin rescue Object=>exception logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: " logger.error exception end end |
#send(queue_name, message_body, message_headers = {}) ⇒ Object
queue_name string, body string, headers hash send a single message to a queue
85 86 87 88 |
# File 'lib/activemessaging/adapters/asqs.rb', line 85 def send queue_name, , ={} queue = get_or_create_queue queue_name send_messsage queue, end |
#subscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues
64 65 66 67 68 69 70 71 72 |
# File 'lib/activemessaging/adapters/asqs.rb', line 64 def subscribe queue_name, ={} # look at the existing queues, create any that are missing queue = get_or_create_queue queue_name if @subscriptions.has_key? queue.name @subscriptions[queue.name].add else @subscriptions[queue.name] = Subscription.new(queue.name, ) end end |
#unreceive(message, headers = {}) ⇒ Object
do nothing; by not deleting the message will eventually become visible again
119 120 121 |
# File 'lib/activemessaging/adapters/asqs.rb', line 119 def unreceive , headers={} return true end |
#unsubscribe(queue_name, message_headers = {}) ⇒ Object
queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok
76 77 78 79 80 81 |
# File 'lib/activemessaging/adapters/asqs.rb', line 76 def unsubscribe queue_name, ={} if @subscriptions[queue_name] @subscriptions[queue_name].remove @subscriptions.delete(queue_name) if @subscriptions[queue_name].count <= 0 end end |