Class: Starling
Constant Summary collapse
- WAIT_TIME =
0.25
Instance Method Summary collapse
- #_original_delete ⇒ Object
- #_original_get ⇒ Object
-
#available_queues(statistics = nil) ⇒ Object
returns a list of available (currently allocated) queues.
-
#delete(key, expiry = 0) ⇒ Object
Delete the key (queue) from all Starling servers.
-
#fetch(*args) ⇒ Object
will return the next item or nil.
-
#flush(queue) ⇒ Object
iterator to flush
queue
. -
#get(*args) ⇒ Object
fetch an item from a queue.
-
#set(queue, value, expiry = 0, raw = false) ⇒ Object
insert
value
intoqueue
. -
#sizeof(queue, statistics = nil) ⇒ Object
returns the number of items in
queue
. -
#with_servers(my_servers = @servers.dup) ⇒ Object
Provides a way to work with a specific list of servers by forcing all calls to #get_server_for_key to use a specific server, and changing that server each time that the call yields to the block provided.
Instance Method Details
#_original_delete ⇒ Object
7 |
# File 'lib/starling.rb', line 7 alias_method :_original_delete, :delete |
#_original_get ⇒ Object
6 |
# File 'lib/starling.rb', line 6 alias_method :_original_get, :get |
#available_queues(statistics = nil) ⇒ Object
returns a list of available (currently allocated) queues.
106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/starling.rb', line 106 def available_queues(statistics = nil) statistics ||= stats statistics.map { |k,v| v.keys }.flatten.uniq.grep(/^queue_(.*)_items/).map { |v| v.gsub(/^queue_/, '').gsub(/_items$/, '') }.reject { |v| v =~ /_total$/ || v =~ /_expired$/ } end |
#delete(key, expiry = 0) ⇒ Object
Delete the key (queue) from all Starling servers. This is necessary because the random way a server is chosen in #get_server_for_key implies that the queue could easily be spread across the entire Starling cluster.
33 34 35 36 37 |
# File 'lib/starling.rb', line 33 def delete(key, expiry = 0) with_servers do _original_delete(key, expiry) end end |
#fetch(*args) ⇒ Object
will return the next item or nil
23 24 25 |
# File 'lib/starling.rb', line 23 def fetch(*args) _original_get(*args) end |
#flush(queue) ⇒ Object
iterator to flush queue
. Each element will be passed to the provided block
122 123 124 125 126 127 |
# File 'lib/starling.rb', line 122 def flush(queue) sizeof(queue).times do v = get(queue) yield v if block_given? end end |
#get(*args) ⇒ Object
fetch an item from a queue.
12 13 14 15 16 17 18 |
# File 'lib/starling.rb', line 12 def get(*args) loop do response = _original_get(*args) return response unless response.nil? sleep WAIT_TIME end end |
#set(queue, value, expiry = 0, raw = false) ⇒ Object
insert value
into queue
.
expiry
is expressed as a UNIX timestamp
If raw
is true, value
will not be Marshalled. If raw
= :yaml, value
will be serialized with YAML, instead.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/starling.rb', line 68 def set(queue, value, expiry = 0, raw = false) retries = 0 begin if raw == :yaml value = YAML.dump(value) raw = true end super(queue, value, expiry, raw) rescue MemCache::MemCacheError => e retries += 1 sleep WAIT_TIME retry unless retries > 3 raise e end end |
#sizeof(queue, statistics = nil) ⇒ Object
returns the number of items in queue
. If queue
is :all
, a hash of all queue sizes will be returned.
89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/starling.rb', line 89 def sizeof(queue, statistics = nil) statistics ||= stats if queue == :all queue_sizes = {} available_queues(statistics).each do |queue| queue_sizes[queue] = sizeof(queue, statistics) end return queue_sizes end statistics.inject(0) { |m,(k,v)| m + v["queue_#{queue}_items"].to_i } end |
#with_servers(my_servers = @servers.dup) ⇒ Object
Provides a way to work with a specific list of servers by forcing all calls to #get_server_for_key to use a specific server, and changing that server each time that the call yields to the block provided. This helps work around the normally random nature of the #get_server_for_key method.
Acquires the mutex for the entire duration of the call since unrelated calls to #get_server_for_key might be adversely affected by the non_random result.
49 50 51 52 53 54 55 56 57 58 |
# File 'lib/starling.rb', line 49 def with_servers(my_servers = @servers.dup) return unless block_given? with_lock do my_servers.each do |server| @force_server = server yield end @force_server = nil end end |