Class: ActiveMessaging::Adapters::Kestrel::Connection
- Inherits:
-
BaseConnection
- Object
- BaseConnection
- ActiveMessaging::Adapters::Kestrel::Connection
- Includes:
- ActiveMessaging::Adapter
- Defined in:
- lib/active_messaging/adapters/kestrel.rb
Overview
Connection to a kestrel message queue server
Constant Summary collapse
- KESTREL_STATS_QUEUE_KEYS =
[:items, :bytes, :total_items, :logsize, :expired_items, :mem_items, :mem_bytes, :age, :discarded, :waiters, :open_transactions]
Instance Attribute Summary collapse
-
#logger ⇒ Object
Logging.
-
#retry_policy ⇒ Object
Reconnect on error.
Instance Method Summary collapse
-
#connect ⇒ Object
Connect to the kestrel server using a Memcached client.
-
#initialize(cfg = {}) ⇒ Connection
constructor
Create a new Kestrel adapter using the provided config.
-
#queue_stats ⇒ Object
Returns hash of hashes of hashes containing stats for each active queue in each member of the kestrel cluster.
-
#receive ⇒ Object
Gets a message from any subscribed destination and returns it as a ActiveMessaging::Adaptors::Kestrel::Message object.
-
#retrier ⇒ Object
Creates a retrier object according to the @retry_policy.
-
#send(destination_name, body, headers = {}) ⇒ Object
Send a message to the named destination.
-
#subscribe(destination_name, headers = {}) ⇒ Object
Subscribe to the named destination and begin receiving messages from it.
-
#unsubscribe(destination_name, headers = {}) ⇒ Object
Stop receiving messages from the named destination.
Constructor Details
#initialize(cfg = {}) ⇒ Connection
Create a new Kestrel adapter using the provided config
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 68 def initialize(cfg = {}) # Like symbol keys cfg = symbolize_keys(cfg) # Create a logger. Use framework loggers when available. @logger = cfg.delete(:logger) || ActiveMessaging.logger || (defined?(::Rails) && ::Rails.logger ? ::Rails.logger : nil) || default_logger # Get the retry policy @retry_policy = cfg.delete(:retry_policy) || {:strategy => SimpleRetry, :config => {:tries => 1, :delay => 5}} # If the retry policy came from the cfg, make sure we set the :logger @retry_policy[:config][:logger] ||= @logger # Turn the strategy into a Class if it is a String if @retry_policy[:strategy].is_a?(String) # Convert strategy from string to class @retry_policy[:strategy] = Kestrel.const_get(@retry_policy[:strategy]) rescue Kestrel.to_class(@retry_policy[:strategy]) end @empty_queues_delay = cfg.delete(:empty_queues_delay) @config = cfg @subscriptions = {} retrier connect nil end |
Instance Attribute Details
#logger ⇒ Object
Logging
63 64 65 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 63 def logger @logger end |
#retry_policy ⇒ Object
Reconnect on error
61 62 63 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 61 def retry_policy @retry_policy end |
Instance Method Details
#connect ⇒ Object
Connect to the kestrel server using a Memcached client
122 123 124 125 126 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 122 def connect logger.debug("Creating connection to Kestrel using config #{@config.inspect}") if logger && logger.debug? @kestrel = MemCache.new(@config) @kestrel.servers = @config[:servers] end |
#queue_stats ⇒ Object
Returns hash of hashes of hashes containing stats for each active queue in each member of the kestrel cluster. top level hash has following structure:
{ "server1_def" => { "queue1" => { ... }, "queue2" => { ... } },
"server2_def" => { "queue1" => { ... }, "queue2" => { ... } } }
“server_def” are host:port
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 99 def queue_stats stats = @kestrel.stats queues = stats.values.inject([]) do |queue_names, hash| hash.keys.each do |key| if md = /queue_(.+)_total_items/.match(key) queue_names << md[1] end end queue_names end stats.inject(Hash.new{|h,k| h[k] = Hash.new(&h.default_proc) }) do |return_hash, (server_def, stats_hash)| queues.each do |queue| KESTREL_STATS_QUEUE_KEYS.each do |key| stats_key = "queue_#{queue}_#{key}" denormalized_name = denormalize_name(queue) # denormalize the name ... return_hash[server_def][denormalized_name][key] = stats_hash[stats_key] end end return_hash end end |
#receive ⇒ Object
Gets a message from any subscribed destination and returns it as a ActiveMessaging::Adaptors::Kestrel::Message object
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 166 def receive if @subscriptions.size > 0 @retrier.do_work(@retry_policy[:config]) do queues_to_check = @subscriptions.size > 1 ? @subscriptions.keys.sort_by{rand} : @subscriptions.keys queues_to_check.each do |queue| if item = @kestrel.get(normalize(queue)) # TODO: ActiveMessaging ought to provide a way to do messaging # without having to wrap the messages in another object #logger.debug("Got message from queue #{queue}: #{item}") if logger && logger.debug? return Message.new({'destination' => queue}, item, queue) end end end end # Sleep a little to avoid a spin loop (ActiveMessaging Gateway ought to do this) sleep(@empty_queues_delay) if @empty_queues_delay && @empty_queues_delay > 0 return nil end |
#retrier ⇒ Object
Creates a retrier object according to the @retry_policy
129 130 131 132 133 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 129 def retrier @retrier ||= begin @retry_policy[:strategy].new end end |
#send(destination_name, body, headers = {}) ⇒ Object
Send a message to the named destination. headers can include any of the following keys:
:ttl => Set the time to live of the message in seconds
155 156 157 158 159 160 161 162 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 155 def send(destination_name, body, headers = {}) ttl = (headers[:ttl] || 0).to_i if ttl <= 0 @kestrel.set(normalize(destination_name), body) else @kestrel.set(normalize(destination_name), body, ttl) end end |
#subscribe(destination_name, headers = {}) ⇒ Object
Subscribe to the named destination and begin receiving messages from it
137 138 139 140 141 142 143 144 145 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 137 def subscribe(destination_name, headers = {}) headers[:destination] = destination_name if @subscriptions[destination_name] # TODO: Should you get an exception or no? else @subscriptions[destination_name] = headers end nil end |
#unsubscribe(destination_name, headers = {}) ⇒ Object
Stop receiving messages from the named destination
148 149 150 |
# File 'lib/active_messaging/adapters/kestrel.rb', line 148 def unsubscribe(destination_name, headers = {}) @subscriptions.delete(destination_name) end |