Class: Redis::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/client.rb

Direct Known Subclasses

Pipeline

Defined Under Namespace

Classes: ProtocolError

Constant Summary collapse

OK =
"OK".freeze
MINUS =
"-".freeze
PLUS =
"+".freeze
COLON =
":".freeze
DOLLAR =
"$".freeze
ASTERISK =
"*".freeze
BULK_COMMANDS =
{
  "set"       => true,
  "setnx"     => true,
  "rpush"     => true,
  "lpush"     => true,
  "lset"      => true,
  "lrem"      => true,
  "sadd"      => true,
  "srem"      => true,
  "sismember" => true,
  "echo"      => true,
  "getset"    => true,
  "smove"     => true,
  "zadd"      => true,
  "zincrby"   => true,
  "zrem"      => true,
  "zscore"    => true,
  "zrank"     => true,
  "zrevrank"  => true,
  "hget"      => true,
  "hdel"      => true,
  "hexists"   => true,
  "publish"   => true
}
MULTI_BULK_COMMANDS =
{
  "mset"      => true,
  "msetnx"    => true,
  "hset"      => true,
  "hmset"     => true
}
BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
REPLY_PROCESSOR =
{
  "exists"    => BOOLEAN_PROCESSOR,
  "sismember" => BOOLEAN_PROCESSOR,
  "sadd"      => BOOLEAN_PROCESSOR,
  "srem"      => BOOLEAN_PROCESSOR,
  "smove"     => BOOLEAN_PROCESSOR,
  "zadd"      => BOOLEAN_PROCESSOR,
  "zrem"      => BOOLEAN_PROCESSOR,
  "move"      => BOOLEAN_PROCESSOR,
  "setnx"     => BOOLEAN_PROCESSOR,
  "del"       => BOOLEAN_PROCESSOR,
  "renamenx"  => BOOLEAN_PROCESSOR,
  "expire"    => BOOLEAN_PROCESSOR,
  "hset"      => BOOLEAN_PROCESSOR,
  "hexists"   => BOOLEAN_PROCESSOR,
  "info"      => lambda{|r|
    info = Hash.new do |hash, key|
      if hash.include?(key.to_s)
        Redis.deprecate "Redis#info will return a hash of string keys, not symbols", caller[2]
        hash[key.to_s]
      end
    end

    r.each_line {|kv|
      k,v = kv.split(":",2).map{|x| x.chomp}
      info[k] = v
    }
    info
  },
  "keys"      => lambda{|r|
    if r.is_a?(Array)
        r
    else
        r.split(" ")
    end
  },
  "hgetall"   => lambda{|r|
    Hash[*r]
  }
}
ALIASES =
{
  "flush_db"             => "flushdb",
  "flush_all"            => "flushall",
  "last_save"            => "lastsave",
  "key?"                 => "exists",
  "delete"               => "del",
  "randkey"              => "randomkey",
  "list_length"          => "llen",
  "push_tail"            => "rpush",
  "push_head"            => "lpush",
  "pop_tail"             => "rpop",
  "pop_head"             => "lpop",
  "list_set"             => "lset",
  "list_range"           => "lrange",
  "list_trim"            => "ltrim",
  "list_index"           => "lindex",
  "list_rm"              => "lrem",
  "set_add"              => "sadd",
  "set_delete"           => "srem",
  "set_count"            => "scard",
  "set_member?"          => "sismember",
  "set_members"          => "smembers",
  "set_intersect"        => "sinter",
  "set_intersect_store"  => "sinterstore",
  "set_inter_store"      => "sinterstore",
  "set_union"            => "sunion",
  "set_union_store"      => "sunionstore",
  "set_diff"             => "sdiff",
  "set_diff_store"       => "sdiffstore",
  "set_move"             => "smove",
  "set_unless_exists"    => "setnx",
  "rename_unless_exists" => "renamenx",
  "type?"                => "type",
  "zset_add"             => "zadd",
  "zset_count"           => "zcard",
  "zset_range_by_score"  => "zrangebyscore",
  "zset_reverse_range"   => "zrevrange",
  "zset_range"           => "zrange",
  "zset_delete"          => "zrem",
  "zset_score"           => "zscore",
  "zset_incr_by"         => "zincrby",
  "zset_increment_by"    => "zincrby"
}
DISABLED_COMMANDS =
{
  "monitor" => true,
  "sync"    => true
}
BLOCKING_COMMANDS =
{
  "blpop" => true,
  "brpop" => true
}

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/redis/client.rb', line 145

def initialize(options = {})
  @host    =  options[:host]    || '127.0.0.1'
  @port    = (options[:port]    || 6379).to_i
  @db      = (options[:db]      || 0).to_i
  @timeout = (options[:timeout] || 5).to_i
  @password = options[:password]
  @logger  =  options[:logger]
  @thread_safe = options[:thread_safe]
  @binary_keys = options[:binary_keys]
  @mutex = Mutex.new if @thread_safe
  @sock = nil
  @pubsub = false

  log(self)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv) ⇒ Object



391
392
393
# File 'lib/redis/client.rb', line 391

def method_missing(*argv)
  call_command(argv)
end

Instance Method Details

#[](key) ⇒ Object



169
170
171
# File 'lib/redis/client.rb', line 169

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



173
174
175
# File 'lib/redis/client.rb', line 173

def []=(key,value)
  set(key, value)
end

#call_command(argv) ⇒ Object

Wrap raw_call_command to handle reconnection on socket error. We try to reconnect just one time, otherwise let the error araise.



337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/redis/client.rb', line 337

def call_command(argv)
  log(argv.inspect, :debug)

  connect_to_server unless connected?

  begin
    raw_call_command(argv.dup)
  rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
    if reconnect
      raw_call_command(argv.dup)
    else
      raise Errno::ECONNRESET
    end
  end
end

#connect_to(host, port) ⇒ Object



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
# File 'lib/redis/client.rb', line 357

def connect_to(host, port)

  # We support connect_to() timeout only if system_timer is availabe
  # or if we are running against Ruby >= 1.9
  # Timeout reading from the socket instead will be supported anyway.
  if @timeout != 0 and Timer
    begin
      @sock = TCPSocket.new(host, port)
    rescue Timeout::Error
      @sock = nil
      raise Timeout::Error, "Timeout connecting to the server"
    end
  else
    @sock = TCPSocket.new(host, port)
  end

  @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

  # If the timeout is set we set the low level socket options in order
  # to make sure a blocking read will return after the specified number
  # of seconds. This hack is from memcached ruby client.
  set_socket_timeout!(@timeout) if @timeout

rescue Errno::ECONNREFUSED
  raise Errno::ECONNREFUSED, "Unable to connect to Redis on #{host}:#{port}"
end

#connect_to_serverObject



384
385
386
387
388
389
# File 'lib/redis/client.rb', line 384

def connect_to_server
  connect_to(@host, @port)
  call_command([:auth, @password]) if @password
  call_command([:select, @db]) if @db != 0
  @sock
end

#decr(key, decrement = nil) ⇒ Object



253
254
255
256
257
258
259
260
# File 'lib/redis/client.rb', line 253

def decr(key, decrement = nil)
  if decrement
    deprecated("decr with a decrement", :decrby, caller[0])
    decrby(key, decrement)
  else
    call_command([:decr, key])
  end
end

#execObject



280
281
282
283
# File 'lib/redis/client.rb', line 280

def exec
  # Need to override Kernel#exec.
  call_command([:exec])
end

#get(key) ⇒ Object



177
178
179
# File 'lib/redis/client.rb', line 177

def get(key)
  call_command([:get, key])
end

#get_size(string) ⇒ Object



490
491
492
# File 'lib/redis/client.rb', line 490

def get_size(string)
  string.bytesize
end

#incr(key, increment = nil) ⇒ Object



244
245
246
247
248
249
250
251
# File 'lib/redis/client.rb', line 244

def incr(key, increment = nil)
  if increment
    deprecated("incr with an increment", :incrby, caller[0])
    incrby(key, increment)
  else
    call_command([:incr, key])
  end
end

#mapped_mget(*keys) ⇒ Object

Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.



224
225
226
227
228
229
230
231
# File 'lib/redis/client.rb', line 224

def mapped_mget(*keys)
  result = {}
  mget(*keys).each do |value|
    key = keys.shift
    result.merge!(key => value) unless value.nil?
  end
  result
end

#mapped_mset(hash) ⇒ Object



205
206
207
# File 'lib/redis/client.rb', line 205

def mapped_mset(hash)
  mset(*hash.to_a.flatten)
end

#mapped_msetnx(hash) ⇒ Object



218
219
220
# File 'lib/redis/client.rb', line 218

def mapped_msetnx(hash)
  msetnx(*hash.to_a.flatten)
end

#maybe_lock(&block) ⇒ Object



459
460
461
462
463
464
465
# File 'lib/redis/client.rb', line 459

def maybe_lock(&block)
  if @thread_safe
    @mutex.synchronize(&block)
  else
    block.call
  end
end

#mset(*args) ⇒ Object



196
197
198
199
200
201
202
203
# File 'lib/redis/client.rb', line 196

def mset(*args)
  if args.size == 1
    deprecated("mset with a hash", :mapped_mset, caller[0])
    mapped_mset(args[0])
  else
    call_command(args.unshift(:mset))
  end
end

#msetnx(*args) ⇒ Object



209
210
211
212
213
214
215
216
# File 'lib/redis/client.rb', line 209

def msetnx(*args)
  if args.size == 1
    deprecated("msetnx with a hash", :mapped_msetnx, caller[0])
    mapped_msetnx(args[0])
  else
    call_command(args.unshift(:msetnx))
  end
end

#multi(&block) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/redis/client.rb', line 285

def multi(&block)
  result = call_command [:multi]

  return result unless block_given?

  begin
    yield(self)
    exec
  rescue Exception => e
    discard
    raise e
  end
end

#pipelined {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


273
274
275
276
277
278
# File 'lib/redis/client.rb', line 273

def pipelined(&block)
  Redis.deprecate("Calling pipelined commands on the yielded object will be deprecated in 2.0", caller[0])
  pipeline = Pipeline.new self
  yield pipeline
  pipeline.execute
end

#process_command(command, argvv) ⇒ Object



451
452
453
454
455
456
457
# File 'lib/redis/client.rb', line 451

def process_command(command, argvv)
  @sock.write(command)
  argvv.map do |argv|
    processor = REPLY_PROCESSOR[argv[0].to_s]
    processor ? processor.call(read_reply) : read_reply
  end
end

#quitObject



268
269
270
271
# File 'lib/redis/client.rb', line 268

def quit
  call_command(['quit'])
rescue Errno::ECONNRESET
end

#raw_call_command(argvp) ⇒ Object



395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
# File 'lib/redis/client.rb', line 395

def raw_call_command(argvp)
  if argvp[0].is_a?(Array)
    argvv = argvp
    pipeline = true
  else
    argvv = [argvp]
    pipeline = false
  end

  if @binary_keys or pipeline or MULTI_BULK_COMMANDS[argvv[0][0].to_s]
    command = ""
    argvv.each do |argv|
      command << "*#{argv.size}\r\n"
      argv.each{|a|
        a = a.to_s
        command << "$#{get_size(a)}\r\n"
        command << a
        command << "\r\n"
      }
    end
  else
    command = ""
    argvv.each do |argv|
      bulk = nil
      argv[0] = argv[0].to_s
      if ALIASES[argv[0]]
        deprecated(argv[0], ALIASES[argv[0]], caller[4])
        argv[0] = ALIASES[argv[0]]
      end
      raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
      if BULK_COMMANDS[argv[0]] and argv.length > 1
        bulk = argv[-1].to_s
        argv[-1] = get_size(bulk)
      end
      command << "#{argv.join(' ')}\r\n"
      command << "#{bulk}\r\n" if bulk
    end
  end
  # When in Pub/Sub mode we don't read replies synchronously.
  if @pubsub
    @sock.write(command)
    return true
  end
  # The normal command execution is reading and processing the reply.
  results = maybe_lock do
    begin
      set_socket_timeout!(0) if requires_timeout_reset?(argvv[0][0].to_s)
      process_command(command, argvv)
    ensure
      set_socket_timeout!(@timeout) if requires_timeout_reset?(argvv[0][0].to_s)
    end
  end

  return pipeline ? results : results[0]
end

#read_replyObject

Raises:

  • (Errno::ECONNRESET)


467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# File 'lib/redis/client.rb', line 467

def read_reply

  # We read the first byte using read() mainly because gets() is
  # immune to raw socket timeouts.
  begin
    reply_type = @sock.read(1)
  rescue Errno::EAGAIN

    # We want to make sure it reconnects on the next command after the
    # timeout. Otherwise the server may reply in the meantime leaving
    # the protocol in a desync status.
    disconnect

    raise Errno::EAGAIN, "Timeout reading from the socket"
  end

  raise Errno::ECONNRESET, "Connection lost" unless reply_type

  format_reply(reply_type, @sock.gets)
end

#select(*args) ⇒ Object



165
166
167
# File 'lib/redis/client.rb', line 165

def select(*args)
  raise "SELECT not allowed, use the :db option when creating the object"
end

#serverObject



353
354
355
# File 'lib/redis/client.rb', line 353

def server
  "#{@host}:#{@port}"
end

#set(key, value, ttl = nil) ⇒ Object



181
182
183
184
185
186
187
188
# File 'lib/redis/client.rb', line 181

def set(key, value, ttl = nil)
  if ttl
    deprecated("set with an expire", :set_with_expire, caller[0])
    set_with_expire(key, value, ttl)
  else
    call_command([:set, key, value])
  end
end

#set_with_expire(key, value, ttl) ⇒ Object



190
191
192
193
194
# File 'lib/redis/client.rb', line 190

def set_with_expire(key, value, ttl)
  Redis.deprecate "Using a non-atomic set with expire. Use setex if your Redis version allows it.", caller[0]
  set(key, value)
  expire(key, ttl)
end

#sort(key, options = {}) ⇒ Object



233
234
235
236
237
238
239
240
241
242
# File 'lib/redis/client.rb', line 233

def sort(key, options = {})
  cmd = []
  cmd << "SORT #{key}"
  cmd << "BY #{options[:by]}" if options[:by]
  cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
  cmd << "#{options[:order]}" if options[:order]
  cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
  cmd << "STORE #{options[:store]}" if options[:store]
  call_command(cmd)
end

#subscribe(*classes) {|sub| ... } ⇒ Object

Yields:

  • (sub)


299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/redis/client.rb', line 299

def subscribe(*classes)
  # Top-level `subscribe` MUST be called with a block,
  # nested `subscribe` MUST NOT be called with a block
  if !@pubsub && !block_given?
    raise "Top-level subscribe requires a block"
  elsif @pubsub == true && block_given?
    raise "Nested subscribe does not take a block"
  elsif @pubsub
    # If we're already pubsub'ing, just subscribe us to some more classes
    call_command [:subscribe,*classes]
    return true
  end

  @pubsub = true
  call_command [:subscribe,*classes]
  sub = Subscription.new
  yield(sub)
  begin
    while true
      type, *reply = read_reply # type, [class,data]
      case type
      when 'subscribe','unsubscribe'
        sub.send(type) && sub.send(type).call(reply[0],reply[1])
      when 'message'
        sub.send(type) && sub.send(type).call(reply[0],reply[1])
      end
      break if type == 'unsubscribe' && reply[1] == 0
    end
  rescue RuntimeError
    call_command [:unsubscribe]
    raise
  ensure
    @pubsub = false
  end
end

#to_sObject



161
162
163
# File 'lib/redis/client.rb', line 161

def to_s
  "Redis Client connected to #{server} against DB #{@db}"
end

#type(key) ⇒ Object

Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing



264
265
266
# File 'lib/redis/client.rb', line 264

def type(key)
  call_command(['type', key])
end