Module: EventMachine::Protocols::Memcache

Includes:
Deferrable
Included in:
TestConnection
Defined in:
lib/em/protocols/memcache.rb

Overview

Implements the Memcache protocol (code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt). Requires memcached >= 1.2.4 w/ noreply support

Usage example

EM.run{
  cache = EM::P::Memcache.connect 'localhost', 11211

  cache.set :a, 'hello'
  cache.set :b, 'hi'
  cache.set :c, 'how are you?'
  cache.set :d, ''

  cache.get(:a){ |v| p v }
  cache.get_hash(:a, :b, :c, :d){ |v| p v }
  cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }

  cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }

  cache.get(:missing){ |m| p [:missing=, m] }
  cache.set(:missing, 'abc'){ p :stored }
  cache.get(:missing){ |m| p [:missing=, m] }
  cache.del(:missing){ p :deleted }
  cache.get(:missing){ |m| p [:missing=, m] }
}

Defined Under Namespace

Classes: ParserError

Constant Summary collapse

Cstored =
'STORED'.freeze
Cend =
'END'.freeze
Cdeleted =
'DELETED'.freeze
Cunknown =
'NOT_FOUND'.freeze
Cerror =
'ERROR'.freeze
Cempty =
''.freeze
Cdelimiter =
"\r\n".freeze

Constants included from Deferrable

Deferrable::Pool

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Deferrable

#callback, #cancel_callback, #cancel_errback, #cancel_timeout, #errback, #fail, future, #set_deferred_status, #succeed, #timeout

Class Method Details

.connect(host = 'localhost', port = 11211) ⇒ Object

Connect to a memcached server (must support NOREPLY, memcached >= 1.2.4)



114
115
116
# File 'lib/em/protocols/memcache.rb', line 114

def self.connect host = 'localhost', port = 11211
  EM.connect host, port, self, host, port
end

Instance Method Details

#connection_completedObject



139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/em/protocols/memcache.rb', line 139

def connection_completed
  @get_cbs = []
  @set_cbs = []
  @del_cbs = []

  @values = {}

  @reconnecting = false
  @connected = true
  succeed
  # set_delimiter "\r\n"
  # set_line_mode
end

#delete(key, expires = 0, &cb) ⇒ Object Also known as: del

Delete the value associated with a key

cache.del :a
cache.del(:b){ puts "deleted the value!" }


105
106
107
108
109
110
# File 'lib/em/protocols/memcache.rb', line 105

def delete key, expires = 0, &cb
  callback{
    send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n"
    @del_cbs << cb if cb
  }
end

#get(*keys) ⇒ Object

Get the value associated with one or multiple keys

cache.get(:a){ |v| p v }
cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }

Raises:

  • (ArgumentError)


61
62
63
64
65
66
67
68
69
70
71
# File 'lib/em/protocols/memcache.rb', line 61

def get *keys
  raise ArgumentError unless block_given?

  callback{
    keys = keys.map{|k| k.to_s.gsub(/\s/,'_') }
    send_data "get #{keys.join(' ')}\r\n"
    @get_cbs << [keys, proc{ |values|
      yield *keys.map{ |k| values[k] }
    }]
  }
end

#get_hash(*keys) ⇒ Object

Gets multiple values as a hash

cache.get_hash(:a, :b, :c, :d){ |h| puts h[:a] }

Raises:

  • (ArgumentError)


92
93
94
95
96
97
98
# File 'lib/em/protocols/memcache.rb', line 92

def get_hash *keys
  raise ArgumentError unless block_given?

  get *keys do |*values|
    yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] }
  end
end

#initialize(host, port = 11211) ⇒ Object



134
135
136
# File 'lib/em/protocols/memcache.rb', line 134

def initialize host, port = 11211
  @host, @port = host, port
end

#process_cmd(line) ⇒ Object

– def receive_line line



175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/em/protocols/memcache.rb', line 175

def process_cmd line
  case line.strip
  when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes>
    bytes = Integer($3)
    # set_binary_mode bytes+2
    # @cur_key = $1
    if @buffer.size >= bytes + 2
      @values[$1] = @buffer.slice!(0,bytes)
      @buffer.slice!(0,2) # \r\n
    else
      raise ParserError
    end

  when Cend # END
    if entry = @get_cbs.shift
      keys, cb = entry
      cb.call(@values)
    end
    @values = {}

  when Cstored # STORED
    if cb = @set_cbs.shift
      cb.call(true)
    end

  when Cdeleted # DELETED
    if cb = @del_cbs.shift
      cb.call(true)
    end

  when Cunknown # NOT_FOUND
    if cb = @del_cbs.shift
      cb.call(false)
    end

  else
    p [:MEMCACHE_UNKNOWN, line]
  end
end

#receive_data(data) ⇒ Object

– 19Feb09 Switched to a custom parser, LineText2 is recursive and can cause

stack overflows when there is too much data.

include EM::P::LineText2



158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/em/protocols/memcache.rb', line 158

def receive_data data
  (@buffer||='') << data

  while index = @buffer.index(Cdelimiter)
    begin
      line = @buffer.slice!(0,index+2)
      process_cmd line
    rescue ParserError
      @buffer[0...0] = line
      break
    end
  end
end

#set(key, val, exptime = 0, &cb) ⇒ Object

Set the value for a given key

cache.set :a, 'hello'
cache.set(:missing, 'abc'){ puts "stored the value!" }


78
79
80
81
82
83
84
85
86
# File 'lib/em/protocols/memcache.rb', line 78

def set key, val, exptime = 0, &cb
  callback{
    val = val.to_s
    send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given?
    send_data val
    send_data Cdelimiter
    @set_cbs << cb if cb
  }
end

#unbindObject



221
222
223
224
225
226
227
228
229
230
# File 'lib/em/protocols/memcache.rb', line 221

def unbind
  if @connected or @reconnecting
    EM.add_timer(1){ reconnect @host, @port }
    @connected = false
    @reconnecting = true
    @deferred_status = nil
  else
    raise 'Unable to connect to memcached server'
  end
end