Class: Adocca::MemCache
- Inherits:
-
Object
- Object
- Adocca::MemCache
- Defined in:
- lib/am_memcache.rb
Overview
A Ruby client library for memcached. Ripped from the gem.
Defined Under Namespace
Classes: ClientError, InternalError, MemCacheError, Server, ServerError
Constant Summary collapse
- HOST_HASH =
a semi-unique id for this ruby instance
Array.new(32).collect do |e| (65 + rand(25)).chr end.join
- DEFAULT_OPTIONS =
Default options for the cache object.
{ :namespace => nil, :readonly => false }
- DEFAULT_PORT =
Default memcached port.
11211
- DEFAULT_WEIGHT =
Default memcached server weight.
1
- LIVELINESS_TIMEOUT =
The maximum amount of time we are willing to spend talking to servers before accepting that something horrible has happened.
5
- ADD_NOTSTORED =
the response when the key is present
"NOT_STORED\r\n"
- ADD_STORED =
the response when the key is NOT present
"STORED\r\n"
- TRANS_EXPTIME =
lock expiry time
10
- TRANS_MAX_TRIES =
10
Instance Attribute Summary collapse
-
#request_timeout ⇒ Object
The amount of time to wait for a response from a memcached server.
Class Method Summary collapse
Instance Method Summary collapse
-
#[](key) ⇒ Object
Shortcut to get a value from the cache.
-
#[]=(key, value) ⇒ Object
Shortcut to save a value in the cache.
-
#active? ⇒ Boolean
Returns whether there is at least one active server for the object.
-
#add(key, value, expiry = 0) ⇒ Object
Add an entry to the cache will return true if the entry didnt already exist.
-
#check(key) ⇒ Object
Method to check if a value is set, and NOT unmarshal, only return bool.
-
#dec(key, amount = 1) ⇒ Object
Increment an entry in the cache.
-
#delete(key, expiry = 0) ⇒ Object
Remove an entry from the cache.
-
#flush ⇒ Object
Flush cache.
-
#get(key) ⇒ Object
get an entry from the cache.
-
#get_server_for_key(key) ⇒ Object
Pick a server to handle the request based on a hash of the key.
-
#inc(key, amount = 1) ⇒ Object
Increment an entry in the cache.
-
#initialize(opts = {}) ⇒ MemCache
constructor
Valid options are:.
-
#inspect ⇒ Object
Return a string representation of the cache object.
-
#invalidate_namespace(ns) ⇒ Object
Invalidate a namespace in the cache.
-
#lock(key, options = {}) ⇒ Object
Will create a reentrant lock on
key
, but waiting no more thantimeout
for the lock. -
#make_cache_key(o) ⇒ Object
Create a key for the cache, incorporating the namespace qualifier if requested.
-
#make_namespace_key(ns, appendage = nil) ⇒ Object
Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace.
-
#readonly? ⇒ Boolean
Returns whether the cache was created read only.
-
#reset ⇒ Object
Reset the connection to all memcache servers.
-
#send_command(key, command) ⇒ Object
Will send a command to the proper server for this key and return the response.
-
#servers=(servers) ⇒ Object
Set the servers that the requests will be distributed between.
-
#set(key, value, expiry = 0) ⇒ Object
Add an entry to the cache.
-
#sock_send_command(key, command, squash_errors = false) ⇒ Object
Will send a command to the proper server for this key and return the response, socket and server.
-
#synchronize(key, &block) ⇒ Object
memcache-driven locking mechanism.
-
#unlock(key) ⇒ Object
Will release any lock on
key
.
Constructor Details
#initialize(opts = {}) ⇒ MemCache
Valid options are:
:namespace
If specified, all keys will have the given value prepended
before accessing the cache. Defaults to nil.
:readonly
If this is set, any attempt to write to the cache will generate
an exception. Defaults to false.
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/am_memcache.rb', line 68 def initialize(opts = {}) opts = DEFAULT_OPTIONS.merge(opts) @namespace = opts[:namespace] @readonly = opts[:readonly] if ActionController::Base.allow_concurrency @mutex = Monitor.new else @mutex = NullMutex.new end @servers = [] @buckets = [] end |
Instance Attribute Details
#request_timeout ⇒ Object
The amount of time to wait for a response from a memcached server. If a response is not completed within this time, the connection to the server will be closed and an error will be raised.
56 57 58 |
# File 'lib/am_memcache.rb', line 56 def request_timeout @request_timeout end |
Class Method Details
.log_error(e) ⇒ Object
81 82 83 84 85 86 87 88 89 |
# File 'lib/am_memcache.rb', line 81 def self.log_error(e) if defined?(RAILS_DEFAULT_LOGGER) RAILS_DEFAULT_LOGGER.error(e) RAILS_DEFAULT_LOGGER.error(PP.pp(e.backtrace, "")) else puts e pp e.backtrace end end |
Instance Method Details
#[](key) ⇒ Object
Shortcut to get a value from the cache.
337 338 339 |
# File 'lib/am_memcache.rb', line 337 def [](key) self.get(key) end |
#[]=(key, value) ⇒ Object
Shortcut to save a value in the cache. This method does not set an expiration on the entry. Use set to specify an explicit expiry.
343 344 345 |
# File 'lib/am_memcache.rb', line 343 def []=(key, value) self.set(key, value) end |
#active? ⇒ Boolean
Returns whether there is at least one active server for the object.
98 99 100 |
# File 'lib/am_memcache.rb', line 98 def active? not @servers.empty? end |
#add(key, value, expiry = 0) ⇒ Object
Add an entry to the cache will return true if the entry didnt already exist
203 204 205 206 207 208 209 210 211 212 |
# File 'lib/am_memcache.rb', line 203 def add(key, value, expiry = 0) cache_key = make_cache_key(key) marshaled_value = nil if Integer === value marshaled_value = value.to_s else marshaled_value = Marshal.dump(value) end send_command(cache_key, "add #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) == ADD_STORED end |
#check(key) ⇒ Object
Method to check if a value is set, and NOT unmarshal, only return bool
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/am_memcache.rb', line 243 def check(key) cache_key = make_cache_key(key) @mutex.synchronize do begin response, sock, server = sock_send_command(cache_key, "get #{cache_key}", true) return false if response =~ /^END/ v, key, flags, bytes = response.split(/ /) value = sock.read(bytes.to_i) sock.timeout_gets(LIVELINESS_TIMEOUT) sock.timeout_gets(LIVELINESS_TIMEOUT) rescue Exception => err server.mark_dead(err.) self.class.log_error(err) return false end true end end |
#dec(key, amount = 1) ⇒ Object
Increment an entry in the cache
189 190 191 192 193 194 |
# File 'lib/am_memcache.rb', line 189 def dec(key, amount = 1) cache_key = make_cache_key(key) rval = send_command(cache_key, "decr #{cache_key} #{amount}") rval = rval.strip.to_i if rval =~ /^\d+/ rval end |
#delete(key, expiry = 0) ⇒ Object
Remove an entry from the cache. return true if it is deleted
228 229 230 231 |
# File 'lib/am_memcache.rb', line 228 def delete(key, expiry = 0) cache_key = make_cache_key(key) send_command(cache_key, "delete #{cache_key} #{expiry}") end |
#flush ⇒ Object
Flush cache
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/am_memcache.rb', line 267 def flush @mutex.synchronize do raise MemCacheError, "No active servers" unless self.active? @servers.each do |server| sock = server.socket unless sock.nil? begin sock.timeout_write("flush_all\r\n", LIVELINESS_TIMEOUT) sock.timeout_gets(LIVELINESS_TIMEOUT) rescue Exception => err server.mark_dead(err.) self.class.log_error(err) end end end end end |
#get(key) ⇒ Object
get an entry from the cache
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/am_memcache.rb', line 135 def get(key) cache_key = make_cache_key(key) rval = nil @mutex.synchronize do begin response, sock, server = sock_send_command(cache_key, "get #{cache_key}", true) return :MemCache_no_such_entry if response =~ /^END/ v, cache_key, flags, bytes = response.split(/ /) rval = sock.read(bytes.to_i) sock.timeout_gets(LIVELINESS_TIMEOUT) sock.timeout_gets(LIVELINESS_TIMEOUT) rescue Exception => err server.mark_dead(err.) self.class.log_error(err) return :MemCache_no_such_entry end end if rval.strip.match(/\A\d+\z/) rval.strip.to_i else # Return the unmarshaled value. begin Marshal.load(rval) rescue ArgumentError, TypeError => err self.class.log_error(err) return :MemCache_no_such_entry end end end |
#get_server_for_key(key) ⇒ Object
Pick a server to handle the request based on a hash of the key.
591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 |
# File 'lib/am_memcache.rb', line 591 def get_server_for_key(key) # Hash the value of the key to select the bucket. hkey = Integer("0x#{key}") # Fetch a server for the given key, retrying if that server is # offline. server = nil 20.times do |tries| server = @buckets[(hkey + tries) % @buckets.nitems] break if server.alive? end raise MemCacheError, "No servers available" unless server server end |
#inc(key, amount = 1) ⇒ Object
Increment an entry in the cache
179 180 181 182 183 184 |
# File 'lib/am_memcache.rb', line 179 def inc(key, amount = 1) cache_key = make_cache_key(key) rval = send_command(cache_key, "incr #{cache_key} #{amount}") rval = rval.strip.to_i if rval =~ /^\d+/ rval end |
#inspect ⇒ Object
Return a string representation of the cache object.
92 93 94 95 |
# File 'lib/am_memcache.rb', line 92 def inspect sprintf("<MemCache: %s servers, %s buckets, ns: %p, ro: %p>", @servers.nitems, @buckets.nitems, @namespace, @readonly) end |
#invalidate_namespace(ns) ⇒ Object
Invalidate a namespace in the cache.
172 173 174 |
# File 'lib/am_memcache.rb', line 172 def invalidate_namespace(ns) inc(make_namespace_key(ns).last) end |
#lock(key, options = {}) ⇒ Object
Will create a reentrant lock on key
, but waiting no more than timeout
for the lock. Will sleep 1 second between each try.
Will return whether the locking was successful.
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/am_memcache.rb', line 296 def lock(key, = {}) timeout = [:timeout] || 1 << 32 expire = [:expire] || 0 cache_key = make_cache_key(key) start_at = Time.now we_locked_now = false we_locked_earlier = false ok_value = "#{HOST_HASH}:#{Thread.current.object_id}" while (!(we_locked_now = add(cache_key, ok_value, expire)) && !(we_locked_earlier = (get(cache_key) == ok_value)) && Time.now < start_at + timeout) sleep 1 end return we_locked_now || we_locked_earlier end |
#make_cache_key(o) ⇒ Object
Create a key for the cache, incorporating the namespace qualifier if requested.
574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 |
# File 'lib/am_memcache.rb', line 574 def make_cache_key(o) if Array === o o = o.clone key = o.pop namespace_key = "" namespace_key << ":#{make_namespace_key(o).first}" unless o.empty? key = "#{@namespace}#{namespace_key}:#{key}" Digest::SHA1.new(key) else key = "#{@namespace}:#{o}" Digest::SHA1.new(key) end end |
#make_namespace_key(ns, appendage = nil) ⇒ Object
Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 |
# File 'lib/am_memcache.rb', line 549 def make_namespace_key(ns, appendage = nil) # ensure that this is an Array ns = ns.to_a # get the first part of the Array next_part = ns.shift # create a unique namespace name of it next_part = [appendage, next_part].join(":") if appendage # create a unique memcache key of the namespace name next_key = "adocca:memcache:namespace:#{next_part}" # make sure it exists in memcache add(next_key, rand(1 << 31)) # get its value in memcache salt = get(next_key) if ns.empty? # if this is the last part of the key return it + its secret salt along with itself (for invalidation purposes) ["#{next_part}:#{salt}", next_key] else # otherwise, fetch the rest along with the last_part = make_namespace_key(ns, next_part) ["#{next_part}:#{salt}:#{last_part.first}", last_part.last] end end |
#readonly? ⇒ Boolean
Returns whether the cache was created read only.
103 104 105 |
# File 'lib/am_memcache.rb', line 103 def readonly? @readonly end |
#reset ⇒ Object
Reset the connection to all memcache servers. This should be called if there is a problem with a cache lookup that might have left the connection in a corrupted state.
236 237 238 239 240 |
# File 'lib/am_memcache.rb', line 236 def reset @mutex.synchronize do @servers.each { |server| server.close } end end |
#send_command(key, command) ⇒ Object
Will send a command to the proper server for this key and return the response
510 511 512 |
# File 'lib/am_memcache.rb', line 510 def send_command(key, command) sock_send_command(key, command)[0] end |
#servers=(servers) ⇒ Object
Set the servers that the requests will be distributed between. Entries can be either strings of the form “hostname:port” or “hostname:port:weight” or MemCache::Server objects.
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/am_memcache.rb', line 110 def servers=(servers) # Create the server objects. @servers = servers.collect do |server| case server when String host, port, weight = server.split(/:/, 3) port ||= DEFAULT_PORT weight ||= DEFAULT_WEIGHT Server::new(host, port, weight) when Server server else raise TypeError, "Cannot convert %s to MemCache::Server" % svr.class.name end end # Create an array of server buckets for weight selection of servers. @buckets = [] @servers.each do |server| server.weight.times { @buckets.push(server) } end end |
#set(key, value, expiry = 0) ⇒ Object
Add an entry to the cache.
215 216 217 218 219 220 221 222 223 224 |
# File 'lib/am_memcache.rb', line 215 def set(key, value, expiry = 0) cache_key = make_cache_key(key) marshaled_value = nil if Integer === value marshaled_value = value.to_s else marshaled_value = Marshal.dump(value) end send_command(cache_key, "set #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) end |
#sock_send_command(key, command, squash_errors = false) ⇒ Object
Will send a command to the proper server for this key and return the response, socket and server
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 |
# File 'lib/am_memcache.rb', line 518 def sock_send_command(key, command, squash_errors = false) @mutex.synchronize do server = nil begin raise MemCacheError, "No active servers" unless self.active? raise MemCacheError, "Update of readonly cache" if @readonly server = get_server_for_key(key) sock = server.socket raise MemCacheError, "No connection to server" if sock.nil? response = nil sock.timeout_write("#{command}\r\n", LIVELINESS_TIMEOUT) [sock.timeout_gets(LIVELINESS_TIMEOUT), sock, server] rescue Exception => err server.mark_dead(err.) if server self.class.log_error(err) raise err unless squash_errors return err. end end end |
#synchronize(key, &block) ⇒ Object
memcache-driven locking mechanism
Uses lock and unlock
324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/am_memcache.rb', line 324 def synchronize(key, &block) if lock_success = lock(key, :expire => TRANS_EXPTIME, :timeout => TRANS_MAX_TRIES) begin yield ensure unlock(key) end else raise MemCacheError, "Couldn't obtain lock on #{key}" end end |
#unlock(key) ⇒ Object
Will release any lock on key
.
315 316 317 |
# File 'lib/am_memcache.rb', line 315 def unlock(key) self.delete(make_cache_key(key)) end |