Module: EventMachine::Protocols::Memcache
Overview
Implements the Memcache protocol (code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt). Requires memcached >= 1.2.4 w/ noreply support
Usage example
EM.run{
cache = EM::P::Memcache.connect 'localhost', 11211
cache.set :a, 'hello'
cache.set :b, 'hi'
cache.set :c, 'how are you?'
cache.set :d, ''
cache.get(:a){ |v| p v }
cache.get_hash(:a, :b, :c, :d){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }
cache.get(:missing){ |m| p [:missing=, m] }
cache.set(:missing, 'abc'){ p :stored }
cache.get(:missing){ |m| p [:missing=, m] }
cache.del(:missing){ p :deleted }
cache.get(:missing){ |m| p [:missing=, m] }
}
Defined Under Namespace
Classes: ParserError
Constant Summary collapse
- Cstored =
'STORED'.freeze
- Cend =
'END'.freeze
- Cdeleted =
'DELETED'.freeze
- Cunknown =
'NOT_FOUND'.freeze
- Cerror =
'ERROR'.freeze
- Cempty =
''.freeze
- Cdelimiter =
"\r\n".freeze
Class Method Summary collapse
-
.connect(host = 'localhost', port = 11211) ⇒ Object
Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4).
Instance Method Summary collapse
- #connection_completed ⇒ Object
-
#delete(key, expires = 0, &cb) ⇒ Object
(also: #del)
Delete the value associated with a key.
-
#get(*keys) ⇒ Object
Get the value associated with one or multiple keys.
-
#get_hash(*keys) ⇒ Object
Gets multiple values as a hash.
-
#initialize(host, port = 11211) ⇒ Object
em hooks.
-
#process_cmd(line) ⇒ Object
– def receive_line line.
-
#receive_data(data) ⇒ Object
– 19Feb09 Switched to a custom parser, LineText2 is recursive and can cause stack overflows when there is too much data.
-
#set(key, val, exptime = 0, &cb) ⇒ Object
Set the value for a given key.
- #unbind ⇒ Object
Methods included from Deferrable
#callback, #cancel_timeout, #errback, #fail, future, #set_deferred_status, #succeed, #timeout
Class Method Details
.connect(host = 'localhost', port = 11211) ⇒ Object
Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4)
109 110 111 |
# File 'lib/em/protocols/memcache.rb', line 109 def self.connect host = 'localhost', port = 11211 EM.connect host, port, self, host, port end |
Instance Method Details
#connection_completed ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/em/protocols/memcache.rb', line 133 def connection_completed @get_cbs = [] @set_cbs = [] @del_cbs = [] @values = {} @reconnecting = false @connected = true succeed # set_delimiter "\r\n" # set_line_mode end |
#delete(key, expires = 0, &cb) ⇒ Object Also known as: del
Delete the value associated with a key
cache.del :a
cache.del(:b){ puts "deleted the value!" }
100 101 102 103 104 105 |
# File 'lib/em/protocols/memcache.rb', line 100 def delete key, expires = 0, &cb callback{ send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n" @del_cbs << cb if cb } end |
#get(*keys) ⇒ Object
Get the value associated with one or multiple keys
cache.get(:a){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/em/protocols/memcache.rb', line 56 def get *keys raise ArgumentError unless block_given? callback{ keys = keys.map{|k| k.to_s.gsub(/\s/,'_') } send_data "get #{keys.join(' ')}\r\n" @get_cbs << [keys, proc{ |values| yield *keys.map{ |k| values[k] } }] } end |
#get_hash(*keys) ⇒ Object
Gets multiple values as a hash
cache.get_hash(:a, :b, :c, :d){ |h| puts h[:a] }
87 88 89 90 91 92 93 |
# File 'lib/em/protocols/memcache.rb', line 87 def get_hash *keys raise ArgumentError unless block_given? get *keys do |*values| yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] } end end |
#initialize(host, port = 11211) ⇒ Object
em hooks
129 130 131 |
# File 'lib/em/protocols/memcache.rb', line 129 def initialize host, port = 11211 @host, @port = host, port end |
#process_cmd(line) ⇒ Object
– def receive_line line
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/em/protocols/memcache.rb', line 167 def process_cmd line case line.strip when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes> bytes = Integer($3) # set_binary_mode bytes+2 # @cur_key = $1 if @buffer.size >= bytes + 2 @values[$1] = @buffer.slice!(0,bytes) @buffer.slice!(0,2) # \r\n else raise ParserError end when Cend # END if entry = @get_cbs.shift keys, cb = entry cb.call(@values) end @values = {} when Cstored # STORED if cb = @set_cbs.shift cb.call(true) end when Cdeleted # DELETED if cb = @del_cbs.shift cb.call(true) end when Cunknown # NOT_FOUND if cb = @del_cbs.shift cb.call(false) end else p [:MEMCACHE_UNKNOWN, line] end end |
#receive_data(data) ⇒ Object
– 19Feb09 Switched to a custom parser, LineText2 is recursive and can cause
stack overflows when there is too much data.
include EM::P::LineText2
151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/em/protocols/memcache.rb', line 151 def receive_data data (@buffer||='') << data while index = @buffer.index(Cdelimiter) begin line = @buffer.slice!(0,index+2) process_cmd line rescue ParserError @buffer[0...0] = line break end end end |
#set(key, val, exptime = 0, &cb) ⇒ Object
Set the value for a given key
cache.set :a, 'hello'
cache.set(:missing, 'abc'){ puts "stored the value!" }
73 74 75 76 77 78 79 80 81 |
# File 'lib/em/protocols/memcache.rb', line 73 def set key, val, exptime = 0, &cb callback{ val = val.to_s send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given? send_data val send_data Cdelimiter @set_cbs << cb if cb } end |
#unbind ⇒ Object
– def receive_binary_data data
@values[@cur_key] = data[0..-3]
end
212 213 214 215 216 217 218 219 220 221 |
# File 'lib/em/protocols/memcache.rb', line 212 def unbind if @connected or @reconnecting EM.add_timer(1){ reconnect @host, @port } @connected = false @reconnecting = true @deferred_status = nil else raise 'Unable to connect to memcached server' end end |