Class: ActiveMessaging::Adapters::AmazonSqs::Connection

Inherits:
BaseConnection
  • Object
show all
Defined in:
lib/activemessaging/adapters/asqs.rb

Constant Summary collapse

QUEUE_NAME_LENGTH =
1..80
MESSAGE_SIZE =
1..(8 * 1024)
VISIBILITY_TIMEOUT =
0..(24 * 60 * 60)
NUMBER_OF_MESSAGES =
1..255
GET_QUEUE_ATTRIBUTES =
['All', 'ApproximateNumberOfMessages', 'VisibilityTimeout']
SET_QUEUE_ATTRIBUTES =
['VisibilityTimeout']

Instance Attribute Summary collapse

Attributes inherited from BaseConnection

#reliable

Instance Method Summary collapse

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ Connection

generic init method needed by a13g



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/activemessaging/adapters/asqs.rb', line 31

def initialize cfg
  raise "Must specify a access_key_id" if (cfg[:access_key_id].nil? || cfg[:access_key_id].empty?)
  raise "Must specify a secret_access_key" if (cfg[:secret_access_key].nil? || cfg[:secret_access_key].empty?)

  @access_key_id=cfg[:access_key_id]
  @secret_access_key=cfg[:secret_access_key]
  @request_expires = cfg[:requestExpires]         || 10
  @request_retry_count = cfg[:requestRetryCount]  || 5
  @aws_version = cfg[:aws_version]                || '2008-01-01'
  @content_type = cfg[:content_type]              || 'text/plain'
  @host = cfg[:host]                              || 'queue.amazonaws.com'
  @port = cfg[:port]                              || 80
  @protocol = cfg[:protocol]                      || 'http'
  @poll_interval = cfg[:poll_interval]            || 1
  @reconnect_delay = cfg[:reconnectDelay]         || 5
  @aws_url="#{@protocol}://#{@host}"

  @cache_queue_list = cfg[:cache_queue_list].nil? ? true : cfg[:cache_queue_list]
  @reliable =         cfg[:reliable].nil?         ? true : cfg[:reliable]

  #initialize the subscriptions and queues
  @subscriptions = {}
  @queues_by_priority = {}
  @current_subscription = 0
  queues
end

Instance Attribute Details

#access_key_idObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def access_key_id
  @access_key_id
end

#aws_versionObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def aws_version
  @aws_version
end

#cache_queue_listObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def cache_queue_list
  @cache_queue_list
end

#content_typeObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def content_type
  @content_type
end

#hostObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def host
  @host
end

#poll_intervalObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def poll_interval
  @poll_interval
end

#portObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def port
  @port
end

#reconnectDelayObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def reconnectDelay
  @reconnectDelay
end

#secret_access_keyObject

configurable params



28
29
30
# File 'lib/activemessaging/adapters/asqs.rb', line 28

def secret_access_key
  @secret_access_key
end

Instance Method Details

#disconnectObject



58
59
60
61
# File 'lib/activemessaging/adapters/asqs.rb', line 58

def disconnect
  #it's an http request - there is no disconnect - ha!
  return true
end

#receive(options = {}) ⇒ Object

new receive respects priorities



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/activemessaging/adapters/asqs.rb', line 113

def receive(options={})
  message = nil

  only_priorities = options[:priorities]

  # loop through the priorities
  @queues_by_priority.keys.sort.each do |priority|

    # skip this priority if there is a list, and it is not in the list
    next if only_priorities && !only_priorities.include?(priority.to_i)

    # puts " - priority: #{priority}"
    # loop through queues for the priority in random order each time
    @queues_by_priority[priority].shuffle.each do |queue_name|
      # puts "   - queue_name: #{queue_name}"
      queue = queues[queue_name]
      subscription = @subscriptions[queue_name]

      next if queue.nil? || subscription.nil?
      messages = retrieve_messsages(queue, 1, subscription.headers[:visibility_timeout])
      
      if (messages && !messages.empty?)
        message = messages[0]
      end

      break if message
    end

    break if message
  end

  # puts " - message: #{message}"
  message
end

#received(message, headers = {}) ⇒ Object

# receive a single message from any of the subscribed queues # check each queue once, then sleep for poll_interval def receive

raise "No subscriptions to receive messages from." if (@subscriptions.nil? || @subscriptions.empty?)
start = @current_subscription
while true
  # puts "calling receive..."
  @current_subscription = ((@current_subscription < @subscriptions.length-1) ? @current_subscription + 1 : 0)
  sleep poll_interval if (@current_subscription == start)
  queue_name = @subscriptions.keys.sort[@current_subscription]
  queue = queues[queue_name]
  subscription = @subscriptions[queue_name]
  unless queue.nil?
    messages = retrieve_messsages queue, 1, subscription.headers[:visibility_timeout]
    return messages[0] unless (messages.nil? or messages.empty? or messages[0].nil?)
  end
end

end



167
168
169
170
171
172
173
174
# File 'lib/activemessaging/adapters/asqs.rb', line 167

def received message, headers={}
  begin
    delete_message message
  rescue Object=>exception
    logger.error "Exception in ActiveMessaging::Adapters::AmazonSWS::Connection.received() logged and ignored: "
    logger.error exception
  end
end

#send(queue_name, message_body, message_headers = {}) ⇒ Object

queue_name string, body string, headers hash send a single message to a queue



93
94
95
96
# File 'lib/activemessaging/adapters/asqs.rb', line 93

def send queue_name, message_body, message_headers={}
  queue = get_or_create_queue queue_name
  send_messsage queue, message_body
end

#subscribe(queue_name, message_headers = {}) ⇒ Object

queue_name string, headers hash for sqs, make sure queue exists, if not create, then add to list of polled queues



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/activemessaging/adapters/asqs.rb', line 65

def subscribe queue_name, message_headers={}
  # look at the existing queues, create any that are missing
  queue = get_or_create_queue queue_name
  if @subscriptions.has_key? queue.name
    @subscriptions[queue.name].add
  else
    @subscriptions[queue.name] = Subscription.new(queue.name, message_headers)
  end
  priority = @subscriptions[queue.name].priority

  @queues_by_priority[priority] = [] unless @queues_by_priority.has_key?(priority)
  @queues_by_priority[priority] << queue.name unless @queues_by_priority[priority].include?(queue.name)
end

#unreceive(message, headers = {}) ⇒ Object

do nothing; by not deleting the message will eventually become visible again



177
178
179
# File 'lib/activemessaging/adapters/asqs.rb', line 177

def unreceive message, headers={}
  return true
end

#unsubscribe(queue_name, message_headers = {}) ⇒ Object

queue_name string, headers hash for sqs, attempt delete the queues, won’t work if not empty, that’s ok



81
82
83
84
85
86
87
88
89
# File 'lib/activemessaging/adapters/asqs.rb', line 81

def unsubscribe queue_name, message_headers={}
  if @subscriptions[queue_name]
    @subscriptions[queue_name].remove
    if @subscriptions[queue_name].count <= 0
      sub = @subscriptions.delete(queue_name)
      @queues_by_priority[sub.priority].delete(queue_name)
    end
  end
end