Module: MemcacheCluster

Extended by:
MemcacheCluster
Included in:
MemcacheCluster
Defined in:
lib/droid/heroku/memcache_cluster.rb

Overview

Manages a pool of memcache servers. This class should not be called outside of the reactor - it does not account for asynchronous access to the server list.

Defined Under Namespace

Classes: Proxy

Constant Summary collapse

HEROKU_NAMESPACE =

heroku’s internal memcache namespace

'0Xfa15837Z'

Instance Method Summary collapse

Instance Method Details

#add(ip, port) ⇒ Object



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/droid/heroku/memcache_cluster.rb', line 102

def add(ip, port)
  host = [ip, port].join(':')
  return if servers.include?(host)

  log { "action=added server=#{host}" }
  @servers.push host
  @servers.sort!
  @caches = {}
  write_to_file
  @last_read = Time.now.to_i
end

#attach(droid, file = 'memcached.yml') ⇒ Object

Create listeners for standard memcache cluster related topics.



72
73
74
75
76
77
78
# File 'lib/droid/heroku/memcache_cluster.rb', line 72

def attach(droid, file='memcached.yml')
  load_from_file(file)

  droid.listen4('memcache.up', :queue => "memcache.up.#{LocalStats.this_instance_name}.#$$") { |msg| add(msg['address'], msg['port']) }
  droid.listen4('instance.down', :queue => "instance.down.#{LocalStats.this_instance_name}.#$$") { |msg| remove(msg['local_ip']) if msg['slot'] == 'memcache' }
  EM.add_timer(1) { droid.publish('memcache.needed', {}) }
end

#cache(prefix, options = {}) ⇒ Object Also known as: []



86
87
88
# File 'lib/droid/heroku/memcache_cluster.rb', line 86

def cache(prefix, options={})
  Proxy.new(prefix, options)
end

#cache_internal(prefix, options = {}) ⇒ Object

A MemCache object configured with the given prefix.



81
82
83
84
# File 'lib/droid/heroku/memcache_cluster.rb', line 81

def cache_internal(prefix, options={})
  caches[prefix] ||=
    MemCache.new(servers, options.merge(:namespace => prefix))
end

#cache_retry(prefix, opts = {}) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/droid/heroku/memcache_cluster.rb', line 37

def cache_retry(prefix, opts={})
  opts[:retries] ||= 5
  opts[:delay] ||= 0.5

  retried = 0
  begin
    c = cache_internal(prefix)
    yield c if block_given?
  rescue MemCache::MemCacheError => e
    Log.error "#{e.class} -> #{e.message}", :exception => e
    raise if retried > opts[:retries]
    retried += 1
    sleep opts[:delay]
    @caches = { }
    retry
  end
end

#cachesObject



92
93
94
95
# File 'lib/droid/heroku/memcache_cluster.rb', line 92

def caches
  reload_if_stale
  @caches ||= {}
end

#get(prefix, *args) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/droid/heroku/memcache_cluster.rb', line 63

def get(prefix, *args)
  res = nil
  cache_retry(prefix) do |c|
    res = c.get(*args)
  end
  res
end

#herokuObject

A MemCache object configured with heroku’s internal memcache namespace.



33
34
35
# File 'lib/droid/heroku/memcache_cluster.rb', line 33

def heroku
  cache(HEROKU_NAMESPACE)
end

#load_from_file(file) ⇒ Object



132
133
134
135
136
137
138
# File 'lib/droid/heroku/memcache_cluster.rb', line 132

def load_from_file(file)
  @file = file
  @last_read = Time.now.to_i
  @servers = YAML.load(File.read(file)) rescue []
  @caches = {}
  log { "action=load file=#{@file} servers=#{@servers.join(',')}" }
end

#log(type = :notice) ⇒ Object



149
150
151
# File 'lib/droid/heroku/memcache_cluster.rb', line 149

def log(type=:notice)
  Log.send(type, "memcache_cluster #{yield}")
end

#reload_if_staleObject



122
123
124
125
126
127
128
129
130
# File 'lib/droid/heroku/memcache_cluster.rb', line 122

def reload_if_stale
  if @last_read &&
    (Time.now.to_i - @last_read) > 20 &&
    File.mtime(@file).to_i > @last_read
    load_from_file(@file)
  end
rescue => e
  log { "action=error file=#{@file} error_class='#{e.class}' error_message='#{e.message}'" }
end

#remove(host) ⇒ Object



114
115
116
117
118
119
120
# File 'lib/droid/heroku/memcache_cluster.rb', line 114

def remove(host)
  if servers.reject!{ |s| s =~ /^#{host}/ }
    log { "action=remove server=#{host}" }
    caches.clear
    write_to_file
  end
end

#serversObject



97
98
99
100
# File 'lib/droid/heroku/memcache_cluster.rb', line 97

def servers
  reload_if_stale
  @servers ||= []
end

#set(prefix, *args) ⇒ Object



55
56
57
58
59
60
61
# File 'lib/droid/heroku/memcache_cluster.rb', line 55

def set(prefix, *args)
  res = nil
  cache_retry(prefix) do |c|
    res = c.set(*args)
  end
  res
end

#write_to_fileObject



140
141
142
143
144
145
146
147
# File 'lib/droid/heroku/memcache_cluster.rb', line 140

def write_to_file
  log { "action=write file=#{@file} servers=#{@servers.join(',')}" }
  File.open(@file, 'w') do |f|
    f.flock(File::LOCK_EX)
    f.write YAML.dump(@servers)
    f.flock(File::LOCK_UN)
  end
end