Class: ActiveMessaging::Adapters::AmazonSqs::Connection

Inherits:
BaseConnection
  • Object
show all
Defined in:
lib/activemessaging/adapters/asqs.rb

Constant Summary collapse

QUEUE_NAME_LENGTH =
1..80
VISIBILITY_TIMEOUT =
0..(24 * 60 * 60)
NUMBER_OF_MESSAGES =
1..255
GET_QUEUE_ATTRIBUTES =
['All', 'ApproximateNumberOfMessages', 'VisibilityTimeout']
SET_QUEUE_ATTRIBUTES =
['VisibilityTimeout']

Instance Attribute Summary collapse

Attributes inherited from BaseConnection

#reliable

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ Connection

generic init method needed by a13g



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/activemessaging/adapters/asqs.rb', line 30

def initialize cfg
  raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?)
  raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?)

  @access_key_id=cfg[:access_key_id]
  @secret_access_key=cfg[:secret_access_key]
  @request_expires = cfg[:requestExpires]         || 10
  @request_retry_count = cfg[:requestRetryCount]  || 5
  @aws_version = cfg[:aws_version]                || '2008-01-01'
  @content_type = cfg[:content_type]              || 'text/plain'
  @host = cfg[:host]                              || 'queue.amazonaws.com'
  @port = cfg[:port]                              || 80
  @protocol = cfg[:protocol]                      || 'http'
  @poll_interval = cfg[:poll_interval]            || 1
  @reconnect_delay = cfg[:reconnectDelay]         || 5

  @max_message_size = cfg[:max_message_size].to_i > 0 ? cfg[:max_message_size].to_i : 8

  @aws_url="#{@protocol}://#{@host}"

  @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list]
  @reliable =         cfg[:reliable].nil?         ? true : cfg[:reliable]

  #initialize the subscriptions and queues
  @subscriptions = {}
  @queues_by_priority = {}
  @current_subscription = 0
  queues
end

Instance Attribute Details

#access_key_idObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def access_key_id
  @access_key_id
end

#aws_versionObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def aws_version
  @aws_version
end

#cache_queue_listObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def cache_queue_list
  @cache_queue_list
end

#content_typeObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def content_type
  @content_type
end

#hostObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def host
  @host
end

#max_message_sizeObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def max_message_size
  @max_message_size
end

#poll_intervalObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def poll_interval
  @poll_interval
end

#portObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def port
  @port
end

#reconnect_delayObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def reconnect_delay
  @reconnect_delay
end

#secret_access_keyObject

configurable params



27
28
29
# File 'lib/activemessaging/adapters/asqs.rb', line 27

def secret_access_key
  @secret_access_key
end

Instance Method Details

#disconnectObject



60
61
62
63
# File 'lib/activemessaging/adapters/asqs.rb', line 60

def disconnect
  #it's an http request - there is no disconnect - ha!
  return true
end

#receive(options = {}) ⇒ Object

new receive respects priorities



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/activemessaging/adapters/asqs.rb', line 115

def receive(options={})
  message = nil

  only_priorities = options[:priorities]

  # loop through the priorities
  @queues_by_priority.keys.sort.each do |priority|

    # skip this priority if there is a list, and it is not in the list
    next if only_priorities && !only_priorities.include?(priority.to_i)

    # puts " - priority: #{priority}"
    # loop through queues for the priority in random order each time
    @queues_by_priority[priority].shuffle.each do |queue_name|
      # puts "   - queue_name: #{queue_name}"
      queue = queues[queue_name]
      subscription = @subscriptions[queue_name]

      next if queue.nil? || subscription.nil?
      messages = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout])
      
      if (messages && !messages.empty?)
        message = messages[0]
      end

      break if message
    end

    break if message
  end

  # puts " - message: #{message}"
  message
end

#received(message, headers = {}) ⇒ Object

# receive a single message from any of the subscribed queues # check each queue once, then sleep for poll_interval def receive

raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?)
start = @current_subscription
while true
  # puts "calling receive..."
  @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0)
  sleep poll_interval if (@current_subscription == start)
  queue_name = @subscriptions.keys.sort[@current_subscription]
  queue = queues[queue_name]
  subscription = @subscriptions[queue_name]
  unless queue.nil?
    messages = retrieve_messsages queue, 1, subscription.headers[:visibility_timeout]
    return messages[0] unless (messages.nil? or messages.empty? or messages[0].nil?)
  end
end

end



169
170
171
172
173
174
175
176
# File 'lib/activemessaging/adapters/asqs.rb', line 169

def received message, headers={}
  begin
    delete_message message
  rescue Object=>exception
    logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: "
    logger.error exception
  end
end

#send(queue_name, message_body, message_headers = {}) ⇒ Object

queue_name string, body string, headers hash send a single message to a queue



95
96
97
98
# File 'lib/activemessaging/adapters/asqs.rb', line 95

def send queue_name, message_body, message_headers={}
  queue = get_or_create_queue queue_name
  send_messsage queue, message_body
end

#subscribe(queue_name, message_headers = {}) ⇒ Object

queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues



67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/activemessaging/adapters/asqs.rb', line 67

def subscribe queue_name, message_headers={}
  # look at the existing queues, create any that are missing
  queue = get_or_create_queue queue_name
  if @subscriptions.has_key? queue.name
    @subscriptions[queue.name].add
  else
    @subscriptions[queue.name] = Subscription.new(queue.name, message_headers)
  end
  priority = @subscriptions[queue.name].priority

  @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority)
  @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name)
end

#unreceive(message, headers = {}) ⇒ Object

do nothing; by not deleting the message will eventually become visible again



179
180
181
# File 'lib/activemessaging/adapters/asqs.rb', line 179

def unreceive message, headers={}
  return true
end

#unsubscribe(queue_name, message_headers = {}) ⇒ Object

queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok



83
84
85
86
87
88
89
90
91
# File 'lib/activemessaging/adapters/asqs.rb', line 83

def unsubscribe queue_name, message_headers={}
  if @subscriptions[queue_name]
    @subscriptions[queue_name].remove
    if @subscriptions[queue_name].count <= 0
      sub = @subscriptions.delete(queue_name)
      @queues_by_priority[sub.priority].delete(queue_name)
    end
  end
end