Module: EventMachine::Protocols::Memcache::Connectable
Overview
Included by Connection to implement the interface to EventMachine::Connection; handles the opening/closing of the connection, and when data is received from the remote endpoint. Parses messages and delivers callbacks to higher level application code.
Constant Summary collapse
- HEADER_REGEX =
/([^\r\n]+?)\r\n/m
- NO_ARG_MESSAGES =
[ 'error', 'stored', 'not_stored', 'exists', 'not_found', 'end', 'deleted', 'ok', ]
- ARG_MESSAGES =
[ 'client_error', 'server_error', 'stat', 'version', 'get', 'gets', ]
- BODY_MESSAGES =
[]
- ARG_AND_BODY_MESSAGES =
['reserved']
- ARGS_AND_BODY_MESSAGES =
[ 'value', 'set', 'add', 'replace', 'append', 'prepend', ]
- ALL_MESSAGES =
NO_ARG_MESSAGES + ARG_MESSAGES + BODY_MESSAGES + ARG_AND_BODY_MESSAGES + ARGS_AND_BODY_MESSAGES
Instance Method Summary collapse
-
#connection_completed ⇒ Object
Called after connection established.
-
#handle(*args) ⇒ Object
Call the user-supplied callback method for the message we got.
-
#io_per_sec ⇒ Object
How fast did we go? Returns a float.
-
#parse_header(header_data) ⇒ Object
Figure out what the header is, and call the appropriate parsing command.
-
#parse_incoming_data ⇒ Object
See if we can pull a header out of the incoming data buffer.
-
#post_init ⇒ Object
Called first(?) reactor spin after instantiated.
-
#receive_data(data) ⇒ Object
Called from EM::Connection when we have data.
-
#receive_message(message_name, data, args = nil) ⇒ Object
Generic “You’ve got mail”…
-
#remote_endpoint ⇒ Object
Returns a nice string showing something like “192.168.1.1:32122”.
-
#set_handler(handler) ⇒ Object
Install callbacks.
-
#start_closing ⇒ Object
Begin shutting the connection down.
-
#to_s ⇒ Object
Turns this sucker into a human-readable string with some semi-useful information.
-
#unbind ⇒ Object
Called from EM::Connection when the connection is dead.
Methods included from Sender
#book_it, #cas, #delete, #get, #send_to_peer, #stats, #value
Instance Method Details
#connection_completed ⇒ Object
Called after connection established. NB: not called for passive (server) connections
91 92 93 94 |
# File 'lib/evented_memcache_client/connectable.rb', line 91 def connection_completed @opened_at = Time.now handle(:open, self) end |
#handle(*args) ⇒ Object
Call the user-supplied callback method for the message we got.
303 304 305 306 307 308 309 310 311 |
# File 'lib/evented_memcache_client/connectable.rb', line 303 def handle(*args) if @callbacks[args[0]] @callbacks[args[0]].call(*args[1..-1]) elsif @handler.respond_to?(args[0]) @handler.send(*args) elsif self.respond_to?("handle_#{args[0]}".to_sym) self.send("handle_#{args[0]}".to_sym, *args[1..-1]) end end |
#io_per_sec ⇒ Object
How fast did we go? Returns a float.
297 298 299 300 |
# File 'lib/evented_memcache_client/connectable.rb', line 297 def io_per_sec total = @msgs_in + @msgs_out (total / ((@closed_at || Time.now) - @opened_at)).to_f end |
#parse_header(header_data) ⇒ Object
Figure out what the header is, and call the appropriate parsing command.
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/evented_memcache_client/connectable.rb', line 130 def parse_header(header_data) elements = header_data.split(/\s/) command_name = elements[0].downcase if self.respond_to?("parse_#{command_name}") self.send("parse_#{command_name}", elements[1..-1]) else handle(:unknown_message, self, command_name, nil, nil) @data.slice!(0, @scanner.pos) end end |
#parse_incoming_data ⇒ Object
See if we can pull a header out of the incoming data buffer
115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/evented_memcache_client/connectable.rb', line 115 def parse_incoming_data @scanner = StringScanner.new(@data) while (header = @scanner.scan(HEADER_REGEX)) do len = @data.length parse_header(header) if len == @data.length break # it didn't take anything out, so there's not enough end @msgs_in += 1 @scanner = StringScanner.new(@data) end end |
#post_init ⇒ Object
Called first(?) reactor spin after instantiated.
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/evented_memcache_client/connectable.rb', line 75 def post_init @data = "" # Stats @rcvs = 0 @msgs_in = 0 @msgs_out = 0 @rcv_stats = {} @snd_stats = {} @opened_at = nil @closed_at = nil end |
#receive_data(data) ⇒ Object
Called from EM::Connection when we have data
97 98 99 100 101 |
# File 'lib/evented_memcache_client/connectable.rb', line 97 def receive_data(data) @rcvs += 1 @data << data parse_incoming_data end |
#receive_message(message_name, data, args = nil) ⇒ Object
Generic “You’ve got mail”… Pass it off to the specific handler method.
247 248 249 250 251 252 253 254 255 |
# File 'lib/evented_memcache_client/connectable.rb', line 247 def (, data, args=nil) @rcv_stats[] ||= 0 @rcv_stats[] += 1 if self.respond_to?("receive_#{}") self.send("receive_#{}", data, args) else handle(:unknown_message, self, , data, args) end end |
#remote_endpoint ⇒ Object
Returns a nice string showing something like “192.168.1.1:32122”
314 315 316 317 318 319 |
# File 'lib/evented_memcache_client/connectable.rb', line 314 def remote_endpoint if !@peer && (peername = get_peername) @peer = Socket.unpack_sockaddr_in(peername) end @peer ? "#{@peer[1]}:#{@peer[0]}" : '?:?' end |
#set_handler(handler) ⇒ Object
Install callbacks.
handler
may be a Hash of Procs, a Module, or an object. If it’s a hash of Procs, each key is a callback like :open, :close, :unknown_message, and :foo, where foo is a memcache protocol message (eg, :stored).
If handler
is a Module, it is assumed that it defines a number of methods like handle_open, handle_close, and handle_foo, where foo is a is a memcache protocol message (eg, handle_stored). handler
is then include
‘d, making its methods available as callbacks.
If handler
is an object, Connectable will invoke methods on it corresponding to the abovementioned. Eg., :open, :closed, :unknown_message, and :<memcache_message>, like :stored, :value, :end, etc.
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/evented_memcache_client/connectable.rb', line 60 def set_handler(handler) @handler = nil @callbacks = {} if handler.respond_to?(:keys) @callbacks = handler elsif handler.is_a?(Module) self.class.class_eval { include handler } else @handler = handler end end |
#start_closing ⇒ Object
Begin shutting the connection down.
292 293 294 |
# File 'lib/evented_memcache_client/connectable.rb', line 292 def start_closing close_connection_after_writing end |
#to_s ⇒ Object
Turns this sucker into a human-readable string with some semi-useful information.
276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/evented_memcache_client/connectable.rb', line 276 def to_s str = '' str << "EM signature: #{signature} peer: #{remote_endpoint}\n" str << "opened: #{@opened_at} (#{Time.now - @opened_at}s)\n" str << "rcvs: #{@rcvs} in: #{@msgs_in} out: #{@msgs_out}\n" @rcv_stats.each {|msg_name,count| str << "#{msg_name} in: #{count}\n" } @snd_stats.each {|msg_name,count| str << "#{msg_name} out: #{count}\n" } str << "I+O/sec: #{'%6.4f' % io_per_sec}." str end |
#unbind ⇒ Object
Called from EM::Connection when the connection is dead.
104 105 106 107 108 |
# File 'lib/evented_memcache_client/connectable.rb', line 104 def unbind @closed_at = Time.now @data = nil handle(:close, self) end |