Module: EventMachine::Protocols::Redis
- Includes:
- EM::Deferrable
- Defined in:
- lib/em-redis/redis_protocol.rb
Defined Under Namespace
Classes: ParserError, ProtocolError, RedisError
Constant Summary collapse
- OK =
constants
"OK".freeze
- MINUS =
"-".freeze
- PLUS =
"+".freeze
- COLON =
":".freeze
- DOLLAR =
"$".freeze
- ASTERISK =
"*".freeze
- DELIM =
"\r\n".freeze
- BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
- TYPE_PROCESSOR =
lambda { |t| t == "none" ? nil : t }
- REPLY_PROCESSOR =
{ "exists" => BOOLEAN_PROCESSOR, "sismember" => BOOLEAN_PROCESSOR, "sadd" => BOOLEAN_PROCESSOR, "srem" => BOOLEAN_PROCESSOR, "smove" => BOOLEAN_PROCESSOR, "zadd" => BOOLEAN_PROCESSOR, "zrem" => BOOLEAN_PROCESSOR, "move" => BOOLEAN_PROCESSOR, "setnx" => BOOLEAN_PROCESSOR, "del" => BOOLEAN_PROCESSOR, "renamenx" => BOOLEAN_PROCESSOR, "expire" => BOOLEAN_PROCESSOR, "select" => BOOLEAN_PROCESSOR, # not in redis gem "hset" => BOOLEAN_PROCESSOR, "hdel" => BOOLEAN_PROCESSOR, "hexists" => BOOLEAN_PROCESSOR, "type" => TYPE_PROCESSOR, "keys" => lambda {|r| if r.is_a?(Array) r else r.split(" ") end }, "info" => lambda{|r| info = {} r.each_line {|kv| k,v = kv.split(":",2).map{|x| x.chomp} info[k.to_sym] = v } info }, "hgetall" => lambda{|r| begin Hash[*r] rescue ArgumentError # Happens when the key is not set, redis returns none. {} end } }
- ALIASES =
{ "flush_db" => "flushdb", "flush_all" => "flushall", "last_save" => "lastsave", "key?" => "exists", "delete" => "del", "randkey" => "randomkey", "list_length" => "llen", "push_tail" => "rpush", "push_head" => "lpush", "pop_tail" => "rpop", "pop_head" => "lpop", "list_set" => "lset", "list_range" => "lrange", "list_trim" => "ltrim", "list_index" => "lindex", "list_rm" => "lrem", "set_add" => "sadd", "set_delete" => "srem", "set_count" => "scard", "set_member?" => "sismember", "set_members" => "smembers", "set_intersect" => "sinter", "set_intersect_store" => "sinterstore", "set_inter_store" => "sinterstore", "set_union" => "sunion", "set_union_store" => "sunionstore", "set_diff" => "sdiff", "set_diff_store" => "sdiffstore", "set_move" => "smove", "set_unless_exists" => "setnx", "rename_unless_exists" => "renamenx", "type?" => "type", "zset_add" => "zadd", "zset_count" => "zcard", "zset_range_by_score" => "zrangebyscore", "zset_reverse_range" => "zrevrange", "zset_range" => "zrange", "zset_delete" => "zrem", "zset_score" => "zscore", "zset_incr_by" => "zincrby", "zset_increment_by" => "zincrby", # these aliases aren't in redis gem "background_save" => 'bgsave', "async_save" => 'bgsave', "members" => 'smembers', "decrement_by" => "decrby", "decrement" => "decr", "increment_by" => "incrby", "increment" => "incr", "set_if_nil" => "setnx", "multi_get" => "mget", "random_key" => "randomkey", "random" => "randomkey", "rename_if_nil" => "renamenx", "tail_pop" => "rpop", "pop" => "rpop", "head_pop" => "lpop", "shift" => "lpop", "list_remove" => "lrem", "index" => "lindex", "trim" => "ltrim", "list_range" => "lrange", "range" => "lrange", "list_len" => "llen", "len" => "llen", "head_push" => "lpush", "unshift" => "lpush", "tail_push" => "rpush", "push" => "rpush", "add" => "sadd", "set_remove" => "srem", "set_size" => "scard", "member?" => "sismember", "intersect" => "sinter", "intersect_and_store" => "sinterstore", "members" => "smembers", "exists?" => "exists" }
- DISABLED_COMMANDS =
{ "monitor" => true, "sync" => true }
Class Method Summary collapse
-
.connect(*args) ⇒ Object
em hooks.
Instance Method Summary collapse
- #[]=(key, value) ⇒ Object
- #auth(password, &blk) ⇒ Object
- #call_command(argv, &blk) ⇒ Object
- #call_commands(pipeline, pipelined = true, &blk) ⇒ Object
- #connection_completed ⇒ Object
- #decr(key, decrement = nil, &blk) ⇒ Object
- #dispatch_response(value) ⇒ Object
- #errback(&blk) ⇒ Object (also: #on_error)
- #exec(&blk) ⇒ Object
- #incr(key, increment = nil, &blk) ⇒ Object
- #initialize(options = {}) ⇒ Object
-
#mapped_mget(*keys) ⇒ Object
Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.
- #maybe_lock(&blk) ⇒ Object
- #method_missing(*argv, &blk) ⇒ Object
- #mset(*args, &blk) ⇒ Object
- #msetnx(*args, &blk) ⇒ Object
-
#multi ⇒ Object
I’m not sure autocommit is a good idea.
- #process_cmd(line) ⇒ Object
- #quit(&blk) ⇒ Object
-
#receive_data(data) ⇒ Object
19Feb09 Switched to a custom parser, LineText2 is recursive and can cause stack overflows when there is too much data.
- #select(db, &blk) ⇒ Object
- #set(key, value, expiry = nil) ⇒ Object
- #sort(key, options = {}, &blk) ⇒ Object
-
#type(key, &blk) ⇒ Object
Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing.
- #unbind ⇒ Object
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(*argv, &blk) ⇒ Object
258 259 260 |
# File 'lib/em-redis/redis_protocol.rb', line 258 def method_missing(*argv, &blk) call_command(argv, &blk) end |
Class Method Details
.connect(*args) ⇒ Object
em hooks
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/em-redis/redis_protocol.rb', line 318 def self.connect(*args) case args.length when 0 = {} when 1 arg = args.shift case arg when Hash then = arg when String then = {:host => arg} else raise ArgumentError, 'first argument must be Hash or String' end when 2 = {:host => args[0], :port => args[1]} else raise ArgumentError, "wrong number of arguments (#{args.length} for 1)" end [:host] ||= '127.0.0.1' [:port] = ([:port] || 6379).to_i EM.connect [:host], [:port], self, end |
Instance Method Details
#[]=(key, value) ⇒ Object
152 153 154 |
# File 'lib/em-redis/redis_protocol.rb', line 152 def []=(key,value) set(key,value) end |
#auth(password, &blk) ⇒ Object
186 187 188 189 |
# File 'lib/em-redis/redis_protocol.rb', line 186 def auth(password, &blk) @password = password call_command(['auth', password], &blk) end |
#call_command(argv, &blk) ⇒ Object
298 299 300 |
# File 'lib/em-redis/redis_protocol.rb', line 298 def call_command(argv, &blk) call_commands([argv], false, &blk) end |
#call_commands(pipeline, pipelined = true, &blk) ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/em-redis/redis_protocol.rb', line 272 def call_commands(pipeline, pipelined = true, &blk) command = "" comms = [] pipeline.each do |argv| argv[0] = argv[0].to_s.downcase argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]] raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]] comms.push(argv) command << "*#{argv.size}\r\n" argv.each do |a| a = a.to_s command << "$#{get_size(a)}\r\n" command << a command << "\r\n" end end @logger.debug { "*** sending: #{command}" } if @logger maybe_lock do @redis_callbacks << [comms, pipelined, blk] send_data command end end |
#connection_completed ⇒ Object
362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/em-redis/redis_protocol.rb', line 362 def connection_completed @logger.debug { "Connected to #{@host}:#{@port}" } if @logger @redis_callbacks = [] @previous_multibulks = [] @multibulk_n = false @reconnecting = false @connected = true succeed end |
#decr(key, decrement = nil, &blk) ⇒ Object
177 178 179 |
# File 'lib/em-redis/redis_protocol.rb', line 177 def decr(key, decrement = nil, &blk) call_command(decrement ? ["decrby",key,decrement] : ["decr",key], &blk) end |
#dispatch_response(value) ⇒ Object
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 |
# File 'lib/em-redis/redis_protocol.rb', line 439 def dispatch_response(value) if @multibulk_n @multibulk_values << value @multibulk_n -= 1 if @multibulk_n == 0 value = @multibulk_values @multibulk_n,@multibulk_values = @previous_multibulks.pop if @multibulk_n dispatch_response(value) return end else return end end callback = @redis_callbacks.shift commands, pipelined, blk = callback @values ||= [] command = commands[@values.size] processor = REPLY_PROCESSOR[command[0]] value = processor.call(value) if processor @values.push(value) if @values.size == commands.size if !pipelined # This was not a PIPELINE! blk.call(@values[0]) if blk else blk.call(@values) if blk end @values = [] else # We need to wait for the other commands to succeed as well @redis_callbacks.unshift([commands, pipelined, blk]) end end |
#errback(&blk) ⇒ Object Also known as: on_error
253 254 255 |
# File 'lib/em-redis/redis_protocol.rb', line 253 def errback(&blk) @error_callback = blk end |
#exec(&blk) ⇒ Object
214 215 216 |
# File 'lib/em-redis/redis_protocol.rb', line 214 def exec(&blk) call_command(['exec'], &blk) end |
#incr(key, increment = nil, &blk) ⇒ Object
173 174 175 |
# File 'lib/em-redis/redis_protocol.rb', line 173 def incr(key, increment = nil, &blk) call_command(increment ? ["incrby",key,increment] : ["incr",key], &blk) end |
#initialize(options = {}) ⇒ Object
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/em-redis/redis_protocol.rb', line 339 def initialize( = {}) @host = [:host] @port = [:port] @db = ([:db] || 0).to_i @password = [:password] @logger = [:logger] @error_callback = lambda do |code| err = RedisError.new err.code = code raise err, "Redis server returned error code: #{code}" end @values = [] # These commands should be first auth_and_select_db end |
#mapped_mget(*keys) ⇒ Object
Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.
193 194 195 196 197 198 199 200 201 202 |
# File 'lib/em-redis/redis_protocol.rb', line 193 def mapped_mget(*keys) mget(*keys) do |response| result = {} response.each do |value| key = keys.shift result.merge!(key => value) unless value.nil? end yield result if block_given? end end |
#maybe_lock(&blk) ⇒ Object
262 263 264 265 266 267 268 269 270 |
# File 'lib/em-redis/redis_protocol.rb', line 262 def maybe_lock(&blk) if !EM.reactor_thread? EM.schedule { maybe_lock(&blk) } elsif @connected yield else callback { yield } end end |
#mset(*args, &blk) ⇒ Object
235 236 237 238 239 240 241 242 |
# File 'lib/em-redis/redis_protocol.rb', line 235 def mset(*args, &blk) hsh = args.pop if Hash === args.last if hsh call_command(hsh.to_a.flatten.unshift(:mset), &blk) else call_command(args.unshift(:mset), &blk) end end |
#msetnx(*args, &blk) ⇒ Object
244 245 246 247 248 249 250 251 |
# File 'lib/em-redis/redis_protocol.rb', line 244 def msetnx(*args, &blk) hsh = args.pop if Hash === args.last if hsh call_command(hsh.to_a.flatten.unshift(:msetnx), &blk) else call_command(args.unshift(:msetnx), &blk) end end |
#multi ⇒ Object
I’m not sure autocommit is a good idea. For example:
r.multi { r.set('a', 'b') { raise "kaboom" } }
will commit “a” and will stop EM
222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/em-redis/redis_protocol.rb', line 222 def multi call_command(['multi']) if block_given? begin yield self exec rescue Exception => e discard raise e end end end |
#process_cmd(line) ⇒ Object
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 |
# File 'lib/em-redis/redis_protocol.rb', line 390 def process_cmd(line) @logger.debug { "*** processing #{line}" } if @logger # first character of buffer will always be the response type reply_type = line[0, 1] reply_args = line.slice(1..-3) # remove type character and \r\n case reply_type #e.g. -MISSING when MINUS # Missing, dispatch empty response dispatch_response(nil) # e.g. +OK when PLUS dispatch_response(reply_args) # e.g. $3\r\nabc\r\n # 'bulk' is more complex because it could be part of multi-bulk when DOLLAR data_len = Integer(reply_args) if data_len == -1 # expect no data; return nil dispatch_response(nil) elsif @buffer.size >= data_len + 2 # buffer is full of expected data dispatch_response(@buffer.slice!(0, data_len)) @buffer.slice!(0,2) # tossing \r\n else # buffer isn't full or nil # TODO: don't control execution with exceptions raise ParserError end #e.g. :8 when COLON dispatch_response(Integer(reply_args)) #e.g. *2\r\n$1\r\na\r\n$1\r\nb\r\n when ASTERISK multibulk_count = Integer(reply_args) if multibulk_count == -1 || multibulk_count == 0 dispatch_response([]) else if @multibulk_n @previous_multibulks << [@multibulk_n, @multibulk_values] end @multibulk_n = multibulk_count @multibulk_values = [] end # Whu? else # TODO: get rid of this exception raise ProtocolError, "reply type not recognized: #{line.strip}" end end |
#quit(&blk) ⇒ Object
210 211 212 |
# File 'lib/em-redis/redis_protocol.rb', line 210 def quit(&blk) call_command(['quit'], &blk) end |
#receive_data(data) ⇒ Object
19Feb09 Switched to a custom parser, LineText2 is recursive and can cause
stack overflows when there is too much data.
include EM::P::LineText2
377 378 379 380 381 382 383 384 385 386 387 388 |
# File 'lib/em-redis/redis_protocol.rb', line 377 def receive_data(data) (@buffer ||= '') << data while index = @buffer.index(DELIM) begin line = @buffer.slice!(0, index+2) process_cmd line rescue ParserError @buffer[0...0] = line break end end end |
#select(db, &blk) ⇒ Object
181 182 183 184 |
# File 'lib/em-redis/redis_protocol.rb', line 181 def select(db, &blk) @db = db.to_i call_command(['select', @db], &blk) end |
#set(key, value, expiry = nil) ⇒ Object
156 157 158 159 160 161 |
# File 'lib/em-redis/redis_protocol.rb', line 156 def set(key, value, expiry=nil) call_command([:set, key, value]) do |s| yield s if block_given? end expire(key, expiry) if expiry end |
#sort(key, options = {}, &blk) ⇒ Object
163 164 165 166 167 168 169 170 171 |
# File 'lib/em-redis/redis_protocol.rb', line 163 def sort(key, ={}, &blk) cmd = ["SORT"] cmd << key cmd << ["BY", [:by]] if [:by] cmd << [[:get]].flatten.map { |key| ["GET", key] } if [:get] cmd << [:order].split(/\s+/) if [:order] cmd << ["LIMIT", [:limit]] if [:limit] call_command(cmd.flatten, &blk) end |
#type(key, &blk) ⇒ Object
Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing
206 207 208 |
# File 'lib/em-redis/redis_protocol.rb', line 206 def type(key, &blk) call_command(['type', key], &blk) end |
#unbind ⇒ Object
478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 |
# File 'lib/em-redis/redis_protocol.rb', line 478 def unbind @logger.debug { "Disconnected" } if @logger if @connected || @reconnecting EM.add_timer(1) do @logger.debug { "Reconnecting to #{@host}:#{@port}" } if @logger reconnect @host, @port auth_and_select_db end @connected = false @reconnecting = true @deferred_status = nil else # TODO: get rid of this exception raise 'Unable to connect to redis server' end end |