Module: EventMachine::Protocols::Memcache
Overview
Implements the Memcache protocol (code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt). Requires memcached >= 1.2.4 w/ noreply support
Usage example
EM.run{
cache = EM::P::Memcache.connect 'localhost', 11211
cache.set :a, 'hello'
cache.set :b, 'hi'
cache.set :c, 'how are you?'
cache.set :d, ''
cache.get(:a){ |v| p v }
cache.get_hash(:a, :b, :c, :d){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }
cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }
cache.get(:missing){ |m| p [:missing=, m] }
cache.set(:missing, 'abc'){ p :stored }
cache.get(:missing){ |m| p [:missing=, m] }
cache.del(:missing){ p :deleted }
cache.get(:missing){ |m| p [:missing=, m] }
}
Defined Under Namespace
Classes: ParserError
Constant Summary collapse
- Cstored =
'STORED'.freeze
- Cend =
'END'.freeze
- Cdeleted =
'DELETED'.freeze
- Cunknown =
'NOT_FOUND'.freeze
- Cerror =
'ERROR'.freeze
- Cempty =
''.freeze
- Cdelimiter =
"\r\n".freeze
Class Method Summary collapse
Instance Method Summary collapse
- #connection_completed ⇒ Object
- #delete(key, expires = 0, &cb) ⇒ Object (also: #del)
-
#get(*keys) ⇒ Object
commands.
- #get_hash(*keys) ⇒ Object
- #initialize(host, port = 11211) ⇒ Object
-
#process_cmd(line) ⇒ Object
def receive_line line.
-
#receive_data(data) ⇒ Object
19Feb09 Switched to a custom parser, LineText2 is recursive and can cause stack overflows when there is too much data.
- #set(key, val, exptime = 0, &cb) ⇒ Object
- #unbind ⇒ Object
Methods included from Deferrable
#callback, #cancel_timeout, #errback, #fail, future, #set_deferred_failure, #set_deferred_status, #set_deferred_success, #succeed, #timeout
Class Method Details
.connect(host = 'localhost', port = 11211) ⇒ Object
em hooks
100 101 102 |
# File 'lib/protocols/memcache.rb', line 100 def self.connect host = 'localhost', port = 11211 EM.connect host, port, self, host, port end |
Instance Method Details
#connection_completed ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/protocols/memcache.rb', line 108 def connection_completed @get_cbs = [] @set_cbs = [] @del_cbs = [] @values = {} @reconnecting = false @connected = true succeed # set_delimiter "\r\n" # set_line_mode end |
#delete(key, expires = 0, &cb) ⇒ Object Also known as: del
79 80 81 82 83 84 |
# File 'lib/protocols/memcache.rb', line 79 def delete key, expires = 0, &cb callback{ send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n" @del_cbs << cb if cb } end |
#get(*keys) ⇒ Object
commands
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/protocols/memcache.rb', line 49 def get *keys raise ArgumentError unless block_given? callback{ keys = keys.map{|k| k.to_s.gsub(/\s/,'_') } send_data "get #{keys.join(' ')}\r\n" @get_cbs << [keys, proc{ |values| yield *keys.map{ |k| values[k] } }] } end |
#get_hash(*keys) ⇒ Object
71 72 73 74 75 76 77 |
# File 'lib/protocols/memcache.rb', line 71 def get_hash *keys raise ArgumentError unless block_given? get *keys do |*values| yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] } end end |
#initialize(host, port = 11211) ⇒ Object
104 105 106 |
# File 'lib/protocols/memcache.rb', line 104 def initialize host, port = 11211 @host, @port = host, port end |
#process_cmd(line) ⇒ Object
def receive_line line
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/protocols/memcache.rb', line 140 def process_cmd line case line.strip when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes> bytes = Integer($3) # set_binary_mode bytes+2 # @cur_key = $1 if @buffer.size >= bytes + 2 @values[$1] = @buffer.slice!(0,bytes) @buffer.slice!(0,2) # \r\n else raise ParserError end when Cend # END if entry = @get_cbs.shift keys, cb = entry cb.call(@values) end @values = {} when Cstored # STORED if cb = @set_cbs.shift cb.call(true) end when Cdeleted # DELETED if cb = @del_cbs.shift cb.call(true) end when Cunknown # NOT_FOUND if cb = @del_cbs.shift cb.call(false) end else p [:MEMCACHE_UNKNOWN, line] end end |
#receive_data(data) ⇒ Object
19Feb09 Switched to a custom parser, LineText2 is recursive and can cause
stack overflows when there is too much data.
include EM::P::LineText2
125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/protocols/memcache.rb', line 125 def receive_data data (@buffer||='') << data while index = @buffer.index(Cdelimiter) begin line = @buffer.slice!(0,index+2) process_cmd line rescue ParserError @buffer[0...0] = line break end end end |
#set(key, val, exptime = 0, &cb) ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/protocols/memcache.rb', line 61 def set key, val, exptime = 0, &cb callback{ val = val.to_s send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given? send_data val send_data Cdelimiter @set_cbs << cb if cb } end |
#unbind ⇒ Object
def receive_binary_data data
@values[@cur_key] = data[0..-3]
end
184 185 186 187 188 189 190 191 192 193 |
# File 'lib/protocols/memcache.rb', line 184 def unbind if @connected or @reconnecting EM.add_timer(1){ reconnect @host, @port } @connected = false @reconnecting = true @deferred_status = nil else raise 'Unable to connect to memcached server' end end |