Module: EventMachine::Protocols::Memcache
Overview
Implements the Memcache protocol (code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt). Requires memcached >= 1.2.4 w/ noreply support
Usage example
EM.run{
cache = EM::P::Memcache.connect 'localhost', 11211
cache.set :a, 'hello'
cache.set :b, 'hi'
cache.set :c, 'how are you?'
cache.set :d, ''
cache.get(:a){ |v| p v }
cache.get_hash(:a, :b, :c, :d){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }
cache.get(:missing){ |m| p [:missing=, m] }
cache.set(:missing, 'abc'){ p :stored }
cache.get(:missing){ |m| p [:missing=, m] }
cache.del(:missing){ p :deleted }
cache.get(:missing){ |m| p [:missing=, m] }
}
Defined Under Namespace
Classes: ParserError
Constant Summary collapse
- Cstored =
'STORED'.freeze
- Cend =
'END'.freeze
- Cdeleted =
'DELETED'.freeze
- Cunknown =
'NOT_FOUND'.freeze
- Cerror =
'ERROR'.freeze
- Cempty =
''.freeze
- Cdelimiter =
"\r\n".freeze
Constants included from Deferrable
Class Method Summary collapse
-
.connect(host = 'localhost', port = 11211) ⇒ Object
Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4).
Instance Method Summary collapse
- #connection_completed ⇒ Object
-
#delete(key, expires = 0, &cb) ⇒ Object
(also: #del)
Delete the value associated with a key.
-
#get(*keys) ⇒ Object
Get the value associated with one or multiple keys.
-
#get_hash(*keys) ⇒ Object
Gets multiple values as a hash.
- #initialize(host, port = 11211) ⇒ Object
-
#process_cmd(line) ⇒ Object
– def receive_line line.
-
#receive_data(data) ⇒ Object
– 19Feb09 Switched to a custom parser, LineText2 is recursive and can cause stack overflows when there is too much data.
-
#set(key, val, exptime = 0, &cb) ⇒ Object
Set the value for a given key.
- #unbind ⇒ Object
Methods included from Deferrable
#callback, #cancel_callback, #cancel_errback, #cancel_timeout, #errback, #fail, future, #set_deferred_status, #succeed, #timeout
Class Method Details
.connect(host = 'localhost', port = 11211) ⇒ Object
Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4)
114 115 116 |
# File 'lib/em/protocols/memcache.rb', line 114 def self.connect host = 'localhost', port = 11211 EM.connect host, port, self, host, port end |
Instance Method Details
#connection_completed ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/em/protocols/memcache.rb', line 139 def connection_completed @get_cbs = [] @set_cbs = [] @del_cbs = [] @values = {} @reconnecting = false @connected = true succeed # set_delimiter "\r\n" # set_line_mode end |
#delete(key, expires = 0, &cb) ⇒ Object Also known as: del
Delete the value associated with a key
cache.del :a
cache.del(:b){ puts "deleted the value!" }
105 106 107 108 109 110 |
# File 'lib/em/protocols/memcache.rb', line 105 def delete key, expires = 0, &cb callback{ send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n" @del_cbs << cb if cb } end |
#get(*keys) ⇒ Object
Get the value associated with one or multiple keys
cache.get(:a){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/em/protocols/memcache.rb', line 61 def get *keys raise ArgumentError unless block_given? callback{ keys = keys.map{|k| k.to_s.gsub(/\s/,'_') } send_data "get #{keys.join(' ')}\r\n" @get_cbs << [keys, proc{ |values| yield *keys.map{ |k| values[k] } }] } end |
#get_hash(*keys) ⇒ Object
Gets multiple values as a hash
cache.get_hash(:a, :b, :c, :d){ |h| puts h[:a] }
92 93 94 95 96 97 98 |
# File 'lib/em/protocols/memcache.rb', line 92 def get_hash *keys raise ArgumentError unless block_given? get *keys do |*values| yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] } end end |
#initialize(host, port = 11211) ⇒ Object
134 135 136 |
# File 'lib/em/protocols/memcache.rb', line 134 def initialize host, port = 11211 @host, @port = host, port end |
#process_cmd(line) ⇒ Object
– def receive_line line
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/em/protocols/memcache.rb', line 175 def process_cmd line case line.strip when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes> bytes = Integer($3) # set_binary_mode bytes+2 # @cur_key = $1 if @buffer.size >= bytes + 2 @values[$1] = @buffer.slice!(0,bytes) @buffer.slice!(0,2) # \r\n else raise ParserError end when Cend # END if entry = @get_cbs.shift keys, cb = entry cb.call(@values) end @values = {} when Cstored # STORED if cb = @set_cbs.shift cb.call(true) end when Cdeleted # DELETED if cb = @del_cbs.shift cb.call(true) end when Cunknown # NOT_FOUND if cb = @del_cbs.shift cb.call(false) end else p [:MEMCACHE_UNKNOWN, line] end end |
#receive_data(data) ⇒ Object
– 19Feb09 Switched to a custom parser, LineText2 is recursive and can cause
stack overflows when there is too much data.
include EM::P::LineText2
158 159 160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/em/protocols/memcache.rb', line 158 def receive_data data (@buffer||='') << data while index = @buffer.index(Cdelimiter) begin line = @buffer.slice!(0,index+2) process_cmd line rescue ParserError @buffer[0...0] = line break end end end |
#set(key, val, exptime = 0, &cb) ⇒ Object
Set the value for a given key
cache.set :a, 'hello'
cache.set(:missing, 'abc'){ puts "stored the value!" }
78 79 80 81 82 83 84 85 86 |
# File 'lib/em/protocols/memcache.rb', line 78 def set key, val, exptime = 0, &cb callback{ val = val.to_s send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given? send_data val send_data Cdelimiter @set_cbs << cb if cb } end |
#unbind ⇒ Object
221 222 223 224 225 226 227 228 229 230 |
# File 'lib/em/protocols/memcache.rb', line 221 def unbind if @connected or @reconnecting EM.add_timer(1){ reconnect @host, @port } @connected = false @reconnecting = true @deferred_status = nil else raise 'Unable to connect to memcached server' end end |