Class: StarlingServer::QueueCollection
- Inherits:
-
Object
- Object
- StarlingServer::QueueCollection
- Defined in:
- lib/starling/queue_collection.rb
Overview
QueueCollection is a proxy to a collection of PersistentQueue instances.
Instance Method Summary collapse
-
#close ⇒ Object
Safely close all queues.
- #delete(key) ⇒ Object
-
#initialize(path) ⇒ QueueCollection
constructor
Create a new QueueCollection at
path
. -
#put(key, data) ⇒ Object
Puts
data
onto the queue namedkey
. -
#queues(key = nil) ⇒ Object
Returns all active queues.
-
#stats(stat_name) ⇒ Object
Returns statistic
stat_name
for the QueueCollection. -
#take(key) ⇒ Object
Retrieves data from the queue named
key
.
Constructor Details
#initialize(path) ⇒ QueueCollection
Create a new QueueCollection at path
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/starling/queue_collection.rb', line 16 def initialize(path) unless File.directory?(path) && File.writable?(path) raise InaccessibleQueuePath.new("'#{path}' must exist and be read-writable by #{Etc.getpwuid(Process.uid).name}.") end @shutdown_mutex = Mutex.new @path = path @logger = StarlingServer::Base.logger @queues = {} @queue_init_mutexes = {} @stats = Hash.new(0) end |
Instance Method Details
#close ⇒ Object
Safely close all queues.
133 134 135 136 137 138 139 |
# File 'lib/starling/queue_collection.rb', line 133 def close @shutdown_mutex.lock @queues.each_pair do |name,queue| queue.close @queues.delete(name) end end |
#delete(key) ⇒ Object
63 64 65 66 67 |
# File 'lib/starling/queue_collection.rb', line 63 def delete(key) queue = @queues.delete(key) return if queue.nil? queue.purge end |
#put(key, data) ⇒ Object
Puts data
onto the queue named key
35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/starling/queue_collection.rb', line 35 def put(key, data) queue = queues(key) return nil unless queue @stats[:current_bytes] += data.size @stats[:total_items] += 1 queue.push(data) return true end |
#queues(key = nil) ⇒ Object
Returns all active queues.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/starling/queue_collection.rb', line 72 def queues(key=nil) return nil if @shutdown_mutex.locked? return @queues if key.nil? # First try to return the queue named 'key' if it's available. return @queues[key] if @queues[key] # If the queue wasn't available, create or get the mutex that will # wrap creation of the Queue. @queue_init_mutexes[key] ||= Mutex.new # Otherwise, check to see if another process is already loading # the queue named 'key'. if @queue_init_mutexes[key].locked? # return an empty/false result if we're waiting for the queue # to be loaded and we're not the first process to request the queue return nil else begin @queue_init_mutexes[key].lock # we've locked the mutex, but only go ahead if the queue hasn't # been loaded. There's a race condition otherwise, and we could # end up loading the queue multiple times. if @queues[key].nil? @queues[key] = PersistentQueue.new(@path, key) @stats[:current_bytes] += @queues[key].initial_bytes end rescue Object => exc puts "ZOMG There was an exception reading back the queue. That totally sucks." puts "The exception was: #{exc}. Backtrace: #{exc.backtrace.join("\n")}" ensure @queue_init_mutexes[key].unlock end end return @queues[key] end |
#stats(stat_name) ⇒ Object
Returns statistic stat_name
for the QueueCollection.
Valid statistics are:
[:get_misses] Total number of get requests with empty responses
[:get_hits] Total number of get requests that returned data
[:current_bytes] Current size in bytes of items in the queues
[:current_size] Current number of items across all queues
[:total_items] Total number of items stored in queues.
122 123 124 125 126 127 128 |
# File 'lib/starling/queue_collection.rb', line 122 def stats(stat_name) case stat_name when nil; @stats when :current_size; current_size else; @stats[stat_name] end end |
#take(key) ⇒ Object
Retrieves data from the queue named key
50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/starling/queue_collection.rb', line 50 def take(key) queue = queues(key) if queue.nil? || queue.length == 0 @stats[:get_misses] += 1 return nil else @stats[:get_hits] += 1 end result = queue.pop @stats[:current_bytes] -= result.size result end |