Class: Adocca::MemCache

Inherits:
Object
  • Object
show all
Defined in:
lib/am_memcache.rb

Overview

A Ruby client library for memcached. Ripped from the gem.

Defined Under Namespace

Classes: ClientError, InternalError, MemCacheError, Server, ServerError

Constant Summary collapse

HOST_HASH =

a semi-unique id for this ruby instance

Array.new(32).collect do |e| (65 + rand(25)).chr end.join
DEFAULT_OPTIONS =

Default options for the cache object.

{
  :namespace => nil,
  :readonly  => false
}
DEFAULT_PORT =

Default memcached port.

11211
DEFAULT_WEIGHT =

Default memcached server weight.

1
LIVELINESS_TIMEOUT =

The maximum amount of time we are willing to spend talking to servers before accepting that something horrible has happened.

5
ADD_NOTSTORED =

the response when the key is present

"NOT_STORED\r\n"
ADD_STORED =

the response when the key is NOT present

"STORED\r\n"
TRANS_EXPTIME =

lock expiry time

10
TRANS_MAX_TRIES =
10

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ MemCache

Valid options are:

:namespace
    If specified, all keys will have the given value prepended
    before accessing the cache.  Defaults to nil.

:readonly
    If this is set, any attempt to write to the cache will generate
    an exception.  Defaults to false.


68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/am_memcache.rb', line 68

def initialize(opts = {})
  opts = DEFAULT_OPTIONS.merge(opts)   
  @namespace = opts[:namespace]
  @readonly  = opts[:readonly]
  if ActionController::Base.allow_concurrency
    @mutex = Monitor.new
  else
    @mutex = NullMutex.new
  end
  @servers   = []
  @buckets   = []
end

Instance Attribute Details

#request_timeoutObject

The amount of time to wait for a response from a memcached server. If a response is not completed within this time, the connection to the server will be closed and an error will be raised.



56
57
58
# File 'lib/am_memcache.rb', line 56

def request_timeout
  @request_timeout
end

Class Method Details

.log_error(e) ⇒ Object



81
82
83
84
85
86
87
88
89
# File 'lib/am_memcache.rb', line 81

def self.log_error(e)
  if defined?(RAILS_DEFAULT_LOGGER)
    RAILS_DEFAULT_LOGGER.error(e)
    RAILS_DEFAULT_LOGGER.error(PP.pp(e.backtrace, ""))
  else
    puts e
    pp e.backtrace
  end
end

Instance Method Details

#[](key) ⇒ Object

Shortcut to get a value from the cache.



337
338
339
# File 'lib/am_memcache.rb', line 337

def [](key)
  self.get(key)
end

#[]=(key, value) ⇒ Object

Shortcut to save a value in the cache. This method does not set an expiration on the entry. Use set to specify an explicit expiry.



343
344
345
# File 'lib/am_memcache.rb', line 343

def []=(key, value)
  self.set(key, value)
end

#active?Boolean

Returns whether there is at least one active server for the object.

Returns:

  • (Boolean)


98
99
100
# File 'lib/am_memcache.rb', line 98

def active?
  not @servers.empty?
end

#add(key, value, expiry = 0) ⇒ Object

Add an entry to the cache will return true if the entry didnt already exist



203
204
205
206
207
208
209
210
211
212
# File 'lib/am_memcache.rb', line 203

def add(key, value, expiry = 0)
  cache_key = make_cache_key(key)
  marshaled_value = nil
  if Integer === value
    marshaled_value = value.to_s
  else
    marshaled_value = Marshal.dump(value)
  end
  send_command(cache_key, "add #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value) == ADD_STORED
end

#check(key) ⇒ Object

Method to check if a value is set, and NOT unmarshal, only return bool



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/am_memcache.rb', line 243

def check(key)
  cache_key = make_cache_key(key)
  @mutex.synchronize do

    begin
      response, sock, server = sock_send_command(cache_key, "get #{cache_key}", true)
      
      return false if response =~ /^END/
      
      v, key, flags, bytes = response.split(/ /)
      value = sock.read(bytes.to_i)
      sock.timeout_gets(LIVELINESS_TIMEOUT)
      sock.timeout_gets(LIVELINESS_TIMEOUT)
    rescue Exception => err
      server.mark_dead(err.message)
      self.class.log_error(err)
      return false
    end
    
    true
  end
end

#dec(key, amount = 1) ⇒ Object

Increment an entry in the cache



189
190
191
192
193
194
# File 'lib/am_memcache.rb', line 189

def dec(key, amount = 1)
  cache_key = make_cache_key(key)
  rval = send_command(cache_key, "decr #{cache_key} #{amount}")
  rval = rval.strip.to_i if rval =~ /^\d+/
  rval
end

#delete(key, expiry = 0) ⇒ Object

Remove an entry from the cache. return true if it is deleted



228
229
230
231
# File 'lib/am_memcache.rb', line 228

def delete(key, expiry = 0)
  cache_key = make_cache_key(key)
  send_command(cache_key, "delete #{cache_key} #{expiry}")
end

#flushObject

Flush cache



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# File 'lib/am_memcache.rb', line 267

def flush
  @mutex.synchronize do
    raise MemCacheError, "No active servers" unless self.active?
    @servers.each do |server|

      sock = server.socket
      unless sock.nil?
        begin
          sock.timeout_write("flush_all\r\n", LIVELINESS_TIMEOUT)
          sock.timeout_gets(LIVELINESS_TIMEOUT)
        rescue Exception => err
          server.mark_dead(err.message)
          self.class.log_error(err)
        end
      end
    end
  end
end

#get(key) ⇒ Object

get an entry from the cache



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/am_memcache.rb', line 135

def get(key)
  cache_key = make_cache_key(key)
  rval = nil
  @mutex.synchronize do
    begin
      response, sock, server = sock_send_command(cache_key, "get #{cache_key}", true)
      return :MemCache_no_such_entry if response =~ /^END/

      v, cache_key, flags, bytes = response.split(/ /)
      rval = sock.read(bytes.to_i)

      sock.timeout_gets(LIVELINESS_TIMEOUT)
      sock.timeout_gets(LIVELINESS_TIMEOUT)
    rescue Exception => err
      server.mark_dead(err.message)
      
      self.class.log_error(err)
      return :MemCache_no_such_entry
    end
  end

  if rval.strip.match(/\A\d+\z/)
    rval.strip.to_i
  else
    # Return the unmarshaled value.
    begin
      Marshal.load(rval)
    rescue ArgumentError, TypeError => err
      self.class.log_error(err)
      return :MemCache_no_such_entry
    end
  end
end

#get_server_for_key(key) ⇒ Object

Pick a server to handle the request based on a hash of the key.

Raises:



591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
# File 'lib/am_memcache.rb', line 591

def get_server_for_key(key)
  # Hash the value of the key to select the bucket.
  hkey = Integer("0x#{key}")
  
  # Fetch a server for the given key, retrying if that server is
  # offline.
  server = nil
  20.times do |tries|
    server = @buckets[(hkey + tries) % @buckets.nitems]
    break if server.alive?
  end

  raise MemCacheError, "No servers available" unless server
  server
end

#inc(key, amount = 1) ⇒ Object

Increment an entry in the cache



179
180
181
182
183
184
# File 'lib/am_memcache.rb', line 179

def inc(key, amount = 1)
  cache_key = make_cache_key(key)
  rval = send_command(cache_key, "incr #{cache_key} #{amount}")
  rval = rval.strip.to_i if rval =~ /^\d+/
  rval
end

#inspectObject

Return a string representation of the cache object.



92
93
94
95
# File 'lib/am_memcache.rb', line 92

def inspect
  sprintf("<MemCache: %s servers, %s buckets, ns: %p, ro: %p>",
          @servers.nitems, @buckets.nitems, @namespace, @readonly)
end

#invalidate_namespace(ns) ⇒ Object

Invalidate a namespace in the cache.



172
173
174
# File 'lib/am_memcache.rb', line 172

def invalidate_namespace(ns)
  inc(make_namespace_key(ns).last)
end

#lock(key, options = {}) ⇒ Object

Will create a reentrant lock on key, but waiting no more than timeout for the lock. Will sleep 1 second between each try.

Will return whether the locking was successful.



296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/am_memcache.rb', line 296

def lock(key, options = {})
  timeout = options[:timeout] || 1 << 32
  expire = options[:expire] || 0
  cache_key = make_cache_key(key)
  start_at = Time.now
  we_locked_now = false
  we_locked_earlier = false
  ok_value = "#{HOST_HASH}:#{Thread.current.object_id}"
  while (!(we_locked_now = add(cache_key, ok_value, expire)) && 
         !(we_locked_earlier = (get(cache_key) == ok_value)) &&
         Time.now < start_at + timeout)
    sleep 1
  end
  return we_locked_now || we_locked_earlier
end

#make_cache_key(o) ⇒ Object

Create a key for the cache, incorporating the namespace qualifier if requested.



574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
# File 'lib/am_memcache.rb', line 574

def make_cache_key(o)
  if Array === o
    o = o.clone
    key = o.pop

    namespace_key = ""
    namespace_key << ":#{make_namespace_key(o).first}" unless o.empty?

    key = "#{@namespace}#{namespace_key}:#{key}"
    Digest::SHA1.new(key)
  else
    key = "#{@namespace}:#{o}"
    Digest::SHA1.new(key)
  end
end

#make_namespace_key(ns, appendage = nil) ⇒ Object

Create a key for this namespace and return it along with the string you want to delete or incr to invalidate this specific namespace



549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
# File 'lib/am_memcache.rb', line 549

def make_namespace_key(ns, appendage = nil)
  # ensure that this is an Array
  ns = ns.to_a
  # get the first part of the Array
  next_part = ns.shift
  # create a unique namespace name of it
  next_part = [appendage, next_part].join(":") if appendage
  # create a unique memcache key of the namespace name
  next_key = "adocca:memcache:namespace:#{next_part}"
  # make sure it exists in memcache
  add(next_key, rand(1 << 31))
  # get its value in memcache
  salt = get(next_key)
  if ns.empty?
    # if this is the last part of the key return it + its secret salt along with itself (for invalidation purposes)
    ["#{next_part}:#{salt}", next_key]
  else
    # otherwise, fetch the rest along with the 
    last_part = make_namespace_key(ns, next_part)
    ["#{next_part}:#{salt}:#{last_part.first}", last_part.last]
  end
end

#readonly?Boolean

Returns whether the cache was created read only.

Returns:

  • (Boolean)


103
104
105
# File 'lib/am_memcache.rb', line 103

def readonly?
  @readonly
end

#resetObject

Reset the connection to all memcache servers. This should be called if there is a problem with a cache lookup that might have left the connection in a corrupted state.



236
237
238
239
240
# File 'lib/am_memcache.rb', line 236

def reset
  @mutex.synchronize do
    @servers.each { |server| server.close }
  end
end

#send_command(key, command) ⇒ Object

Will send a command to the proper server for this key and return the response



510
511
512
# File 'lib/am_memcache.rb', line 510

def send_command(key, command)
  sock_send_command(key, command)[0]
end

#servers=(servers) ⇒ Object

Set the servers that the requests will be distributed between. Entries can be either strings of the form “hostname:port” or “hostname:port:weight” or MemCache::Server objects.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/am_memcache.rb', line 110

def servers=(servers)
  # Create the server objects.
  @servers = servers.collect do |server|
    case server
    when String
      host, port, weight = server.split(/:/, 3)
      port ||= DEFAULT_PORT
      weight ||= DEFAULT_WEIGHT
      Server::new(host, port, weight)
    when Server
      server
    else
      raise TypeError, "Cannot convert %s to MemCache::Server" %
        svr.class.name
    end
  end

  # Create an array of server buckets for weight selection of servers.
  @buckets = []
  @servers.each do |server|
    server.weight.times { @buckets.push(server) }
  end
end

#set(key, value, expiry = 0) ⇒ Object

Add an entry to the cache.



215
216
217
218
219
220
221
222
223
224
# File 'lib/am_memcache.rb', line 215

def set(key, value, expiry = 0)
  cache_key = make_cache_key(key)
  marshaled_value = nil
  if Integer === value
    marshaled_value = value.to_s
  else
    marshaled_value = Marshal.dump(value)
  end
  send_command(cache_key, "set #{cache_key} 0 #{expiry} #{marshaled_value.size}\r\n" + marshaled_value)
end

#sock_send_command(key, command, squash_errors = false) ⇒ Object

Will send a command to the proper server for this key and return the response, socket and server



518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
# File 'lib/am_memcache.rb', line 518

def sock_send_command(key, command, squash_errors = false)
  @mutex.synchronize do
    server = nil
    begin
      raise MemCacheError, "No active servers" unless self.active?
      raise MemCacheError, "Update of readonly cache" if @readonly
      server = get_server_for_key(key)
      
      sock = server.socket
      raise MemCacheError, "No connection to server" if sock.nil?
      
      response = nil
      
      sock.timeout_write("#{command}\r\n", LIVELINESS_TIMEOUT)
      [sock.timeout_gets(LIVELINESS_TIMEOUT), sock, server]
                
    rescue Exception => err
      server.mark_dead(err.message) if server
      self.class.log_error(err)
      raise err unless squash_errors
      return err.message
    end
  end
end

#synchronize(key, &block) ⇒ Object

memcache-driven locking mechanism

Uses lock and unlock



324
325
326
327
328
329
330
331
332
333
334
# File 'lib/am_memcache.rb', line 324

def synchronize(key, &block)
  if lock_success = lock(key, :expire => TRANS_EXPTIME, :timeout => TRANS_MAX_TRIES)
    begin
      yield
    ensure
      unlock(key)
    end
  else
    raise MemCacheError, "Couldn't obtain lock on #{key}"
  end
end

#unlock(key) ⇒ Object

Will release any lock on key.



315
316
317
# File 'lib/am_memcache.rb', line 315

def unlock(key)
  self.delete(make_cache_key(key))
end