Module: EventMachine::Protocols::Memcache

Includes:
Deferrable
Included in:
TestConnection
Defined in:
lib/protocols/memcache.rb

Overview

Implements the Memcache protocol (code.sixapart.com/svn/memcached/trunk/server/doc/protocol.txt). Requires memcached >= 1.2.4 w/ noreply support

Usage example

EM.run{
  cache = EM::P::Memcache.connect 'localhost', 11211

  cache.set :a, 'hello'
  cache.set :b, 'hi'
  cache.set :c, 'how are you?'
  cache.set :d, ''

  cache.get(:a){ |v| p v }
  cache.get_hash(:a, :b, :c, :d){ |v| p v }
  cache.get(:a,:b,:c,:d){ |a,b,c,d| p [a,b,c,d] }

  cache.get(:a,:z,:b,:y,:d){ |a,z,b,y,d| p [a,z,b,y,d] }

  cache.get(:missing){ |m| p [:missing=, m] }
  cache.set(:missing, 'abc'){ p :stored }
  cache.get(:missing){ |m| p [:missing=, m] }
  cache.del(:missing){ p :deleted }
  cache.get(:missing){ |m| p [:missing=, m] }
}

Defined Under Namespace

Classes: ParserError

Constant Summary collapse

Cstored =
'STORED'.freeze
Cend =
'END'.freeze
Cdeleted =
'DELETED'.freeze
Cunknown =
'NOT_FOUND'.freeze
Cerror =
'ERROR'.freeze
Cempty =
''.freeze
Cdelimiter =
"\r\n".freeze

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Deferrable

#callback, #cancel_timeout, #errback, #fail, future, #set_deferred_failure, #set_deferred_status, #set_deferred_success, #succeed, #timeout

Class Method Details

.connect(host = 'localhost', port = 11211) ⇒ Object

em hooks



100
101
102
# File 'lib/protocols/memcache.rb', line 100

def self.connect host = 'localhost', port = 11211
  EM.connect host, port, self, host, port
end

Instance Method Details

#connection_completedObject



108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/protocols/memcache.rb', line 108

def connection_completed
  @get_cbs = []
  @set_cbs = []
  @del_cbs = []

  @values = {}

  @reconnecting = false
  @connected = true
  succeed
  # set_delimiter "\r\n"
  # set_line_mode
end

#delete(key, expires = 0, &cb) ⇒ Object Also known as: del



79
80
81
82
83
84
# File 'lib/protocols/memcache.rb', line 79

def delete key, expires = 0, &cb
  callback{
    send_data "delete #{key} #{expires}#{cb ? '' : ' noreply'}\r\n"
    @del_cbs << cb if cb
  }
end

#get(*keys) ⇒ Object

commands

Raises:

  • (ArgumentError)


49
50
51
52
53
54
55
56
57
58
59
# File 'lib/protocols/memcache.rb', line 49

def get *keys
  raise ArgumentError unless block_given?

  callback{
    keys = keys.map{|k| k.to_s.gsub(/\s/,'_') }
    send_data "get #{keys.join(' ')}\r\n"
    @get_cbs << [keys, proc{ |values|
      yield *keys.map{ |k| values[k] }
    }]
  }
end

#get_hash(*keys) ⇒ Object

Raises:

  • (ArgumentError)


71
72
73
74
75
76
77
# File 'lib/protocols/memcache.rb', line 71

def get_hash *keys
  raise ArgumentError unless block_given?

  get *keys do |*values|
    yield keys.inject({}){ |hash, k| hash.update k => values[keys.index(k)] }
  end
end

#initialize(host, port = 11211) ⇒ Object



104
105
106
# File 'lib/protocols/memcache.rb', line 104

def initialize host, port = 11211
  @host, @port = host, port
end

#process_cmd(line) ⇒ Object

def receive_line line



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/protocols/memcache.rb', line 140

def process_cmd line
  case line.strip
  when /^VALUE\s+(.+?)\s+(\d+)\s+(\d+)/ # VALUE <key> <flags> <bytes>
    bytes = Integer($3)
    # set_binary_mode bytes+2
    # @cur_key = $1
    if @buffer.size >= bytes + 2
      @values[$1] = @buffer.slice!(0,bytes)
      @buffer.slice!(0,2) # \r\n
    else
      raise ParserError
    end

  when Cend # END
    if entry = @get_cbs.shift
      keys, cb = entry
      cb.call(@values)
    end
    @values = {}

  when Cstored # STORED
    if cb = @set_cbs.shift
      cb.call(true)
    end

  when Cdeleted # DELETED
    if cb = @del_cbs.shift
      cb.call(true)
    end

  when Cunknown # NOT_FOUND
    if cb = @del_cbs.shift
      cb.call(false)
    end

  else
    p [:MEMCACHE_UNKNOWN, line]
  end
end

#receive_data(data) ⇒ Object

19Feb09 Switched to a custom parser, LineText2 is recursive and can cause

stack overflows when there is too much data.

include EM::P::LineText2



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/protocols/memcache.rb', line 125

def receive_data data
  (@buffer||='') << data

  while index = @buffer.index(Cdelimiter)
    begin
      line = @buffer.slice!(0,index+2)
      process_cmd line
    rescue ParserError
      @buffer[0...0] = line
      break
    end
  end
end

#set(key, val, exptime = 0, &cb) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/protocols/memcache.rb', line 61

def set key, val, exptime = 0, &cb
  callback{
    val = val.to_s
    send_cmd :set, key, 0, exptime, val.respond_to?(:bytesize) ? val.bytesize : val.size, !block_given?
    send_data val
    send_data Cdelimiter
    @set_cbs << cb if cb
  }
end

#unbindObject

def receive_binary_data data

@values[@cur_key] = data[0..-3]

end



184
185
186
187
188
189
190
191
192
193
# File 'lib/protocols/memcache.rb', line 184

def unbind
  if @connected or @reconnecting
    EM.add_timer(1){ reconnect @host, @port }
    @connected = false
    @reconnecting = true
    @deferred_status = nil
  else
    raise 'Unable to connect to memcached server'
  end
end