Module: EventMachine::Protocols::Memcache::Connectable

Includes:
Sender
Included in:
Connection
Defined in:
lib/evented_memcache_client/connectable.rb

Overview

Included by Connection to implement the interface to EventMachine::Connection; handles the opening/closing of the connection, and when data is received from the remote endpoint. Parses messages and delivers callbacks to higher level application code.

Constant Summary collapse

HEADER_REGEX =
/([^\r\n]+?)\r\n/m
NO_ARG_MESSAGES =
[
  'error',
  'stored',
  'not_stored',
  'exists',
  'not_found',
  'end',
  'deleted',
  'ok',
]
ARG_MESSAGES =
[
  'client_error',
  'server_error',
  'stat',
  'version',
         'get',
         'gets',
]
BODY_MESSAGES =
[]
ARG_AND_BODY_MESSAGES =
['reserved']
ARGS_AND_BODY_MESSAGES =
[
  'value',
  'set',
  'add',
  'replace',
  'append',
  'prepend',
]
ALL_MESSAGES =
NO_ARG_MESSAGES + ARG_MESSAGES +
BODY_MESSAGES + ARG_AND_BODY_MESSAGES + ARGS_AND_BODY_MESSAGES

Instance Method Summary collapse

Methods included from Sender

#book_it, #cas, #delete, #get, #send_to_peer, #stats, #value

Instance Method Details

#connection_completedObject

Called after connection established. NB: not called for passive (server) connections



91
92
93
94
# File 'lib/evented_memcache_client/connectable.rb', line 91

def connection_completed
  @opened_at = Time.now
  handle(:open, self)
end

#handle(*args) ⇒ Object

Call the user-supplied callback method for the message we got.



303
304
305
306
307
308
309
310
311
# File 'lib/evented_memcache_client/connectable.rb', line 303

def handle(*args)
  if @callbacks[args[0]]
    @callbacks[args[0]].call(*args[1..-1])
  elsif @handler.respond_to?(args[0])
    @handler.send(*args)
  elsif self.respond_to?("handle_#{args[0]}".to_sym)
    self.send("handle_#{args[0]}".to_sym, *args[1..-1])
  end
end

#io_per_secObject

How fast did we go? Returns a float.



297
298
299
300
# File 'lib/evented_memcache_client/connectable.rb', line 297

def io_per_sec
  total = @msgs_in + @msgs_out
  (total / ((@closed_at || Time.now) - @opened_at)).to_f
end

#parse_header(header_data) ⇒ Object

Figure out what the header is, and call the appropriate parsing command.



130
131
132
133
134
135
136
137
138
139
# File 'lib/evented_memcache_client/connectable.rb', line 130

def parse_header(header_data)
  elements = header_data.split(/\s/)
  command_name = elements[0].downcase
  if self.respond_to?("parse_#{command_name}")
    self.send("parse_#{command_name}", elements[1..-1])
  else
    handle(:unknown_message, self, command_name, nil, nil)
    @data.slice!(0, @scanner.pos)
  end
end

#parse_incoming_dataObject

See if we can pull a header out of the incoming data buffer



115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/evented_memcache_client/connectable.rb', line 115

def parse_incoming_data
  @scanner = StringScanner.new(@data)
  while (header = @scanner.scan(HEADER_REGEX)) do
    len = @data.length
    parse_header(header)
    if len == @data.length
      break # it didn't take anything out, so there's not enough
    end
    @msgs_in		+= 1
    @scanner = StringScanner.new(@data)
  end
end

#post_initObject

Called first(?) reactor spin after instantiated.



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/evented_memcache_client/connectable.rb', line 75

def post_init
  @data		= ""

  # Stats

  @rcvs		= 0
  @msgs_in	= 0
  @msgs_out	= 0
  @rcv_stats	= {}
  @snd_stats	= {}
  @opened_at	= nil
  @closed_at	= nil
end

#receive_data(data) ⇒ Object

Called from EM::Connection when we have data



97
98
99
100
101
# File 'lib/evented_memcache_client/connectable.rb', line 97

def receive_data(data)
  @rcvs += 1
  @data << data
  parse_incoming_data
end

#receive_message(message_name, data, args = nil) ⇒ Object

Generic “You’ve got mail”… Pass it off to the specific handler method.



247
248
249
250
251
252
253
254
255
# File 'lib/evented_memcache_client/connectable.rb', line 247

def receive_message(message_name, data, args=nil)
  @rcv_stats[message_name] ||= 0
  @rcv_stats[message_name] += 1
  if self.respond_to?("receive_#{message_name}")
    self.send("receive_#{message_name}", data, args)
  else
    handle(:unknown_message, self, message_name, data, args)
  end
end

#remote_endpointObject

Returns a nice string showing something like “192.168.1.1:32122”



314
315
316
317
318
319
# File 'lib/evented_memcache_client/connectable.rb', line 314

def remote_endpoint
  if !@peer && (peername = get_peername)
    @peer = Socket.unpack_sockaddr_in(peername)
  end
  @peer ? "#{@peer[1]}:#{@peer[0]}" : '?:?'
end

#set_handler(handler) ⇒ Object

Install callbacks.

handler may be a Hash of Procs, a Module, or an object. If it’s a hash of Procs, each key is a callback like :open, :close, :unknown_message, and :foo, where foo is a memcache protocol message (eg, :stored).

If handler is a Module, it is assumed that it defines a number of methods like handle_open, handle_close, and handle_foo, where foo is a is a memcache protocol message (eg, handle_stored). handler is then include‘d, making its methods available as callbacks.

If handler is an object, Connectable will invoke methods on it corresponding to the abovementioned. Eg., :open, :closed, :unknown_message, and :<memcache_message>, like :stored, :value, :end, etc.



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/evented_memcache_client/connectable.rb', line 60

def set_handler(handler)
         @handler 	= nil
         @callbacks	= {}
  if handler.respond_to?(:keys)
    @callbacks = handler
  elsif handler.is_a?(Module)
    self.class.class_eval {
      include handler
    }
  else
    @handler = handler
  end
end

#start_closingObject

Begin shutting the connection down.



292
293
294
# File 'lib/evented_memcache_client/connectable.rb', line 292

def start_closing
  close_connection_after_writing
end

#to_sObject

Turns this sucker into a human-readable string with some semi-useful information.



276
277
278
279
280
281
282
283
284
285
286
287
288
289
# File 'lib/evented_memcache_client/connectable.rb', line 276

def to_s
  str = ''
  str << "EM signature: #{signature} peer: #{remote_endpoint}\n"
  str << "opened: #{@opened_at} (#{Time.now - @opened_at}s)\n"
  str << "rcvs: #{@rcvs} in: #{@msgs_in} out: #{@msgs_out}\n"
  @rcv_stats.each {|msg_name,count|
    str << "#{msg_name} in: #{count}\n"
  }
  @snd_stats.each {|msg_name,count|
    str << "#{msg_name} out: #{count}\n"
  }
  str << "I+O/sec: #{'%6.4f' % io_per_sec}."
  str
end

#unbindObject

Called from EM::Connection when the connection is dead.



104
105
106
107
108
# File 'lib/evented_memcache_client/connectable.rb', line 104

def unbind
  @closed_at = Time.now
  @data = nil
  handle(:close, self)
end