Class: Redis::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/client.rb

Direct Known Subclasses

Pipeline

Defined Under Namespace

Classes: ProtocolError

Constant Summary collapse

OK =
"OK".freeze
MINUS =
"-".freeze
PLUS =
"+".freeze
COLON =
":".freeze
DOLLAR =
"$".freeze
ASTERISK =
"*".freeze
BULK_COMMANDS =
{
  "set"       => true,
  "setnx"     => true,
  "rpush"     => true,
  "lpush"     => true,
  "lset"      => true,
  "lrem"      => true,
  "sadd"      => true,
  "srem"      => true,
  "sismember" => true,
  "echo"      => true,
  "getset"    => true,
  "smove"     => true,
  "zadd"      => true,
  "zincrby"   => true,
  "zrem"      => true,
  "zscore"    => true,
  "zrank"     => true,
  "zrevrank"  => true,
  "hget"      => true,
  "hdel"      => true,
  "hexists"   => true,
  "publish"   => true
}
MULTI_BULK_COMMANDS =
{
  "mset"      => true,
  "msetnx"    => true,
  "hset"      => true,
  "hmset"     => true
}
BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
REPLY_PROCESSOR =
{
  "exists"    => BOOLEAN_PROCESSOR,
  "sismember" => BOOLEAN_PROCESSOR,
  "sadd"      => BOOLEAN_PROCESSOR,
  "srem"      => BOOLEAN_PROCESSOR,
  "smove"     => BOOLEAN_PROCESSOR,
  "zadd"      => BOOLEAN_PROCESSOR,
  "zrem"      => BOOLEAN_PROCESSOR,
  "move"      => BOOLEAN_PROCESSOR,
  "setnx"     => BOOLEAN_PROCESSOR,
  "del"       => BOOLEAN_PROCESSOR,
  "renamenx"  => BOOLEAN_PROCESSOR,
  "expire"    => BOOLEAN_PROCESSOR,
  "hset"      => BOOLEAN_PROCESSOR,
  "hexists"   => BOOLEAN_PROCESSOR,
  "info"      => lambda{|r|
    info = {}
    r.each_line {|kv|
      k,v = kv.split(":",2).map{|x| x.chomp}
      info[k.to_sym] = v
    }
    info
  },
  "keys"      => lambda{|r|
    if r.is_a?(Array)
        r
    else
        r.split(" ")
    end
  },
  "hgetall"   => lambda{|r|
    Hash[*r]
  }
}
ALIASES =
{
  "flush_db"             => "flushdb",
  "flush_all"            => "flushall",
  "last_save"            => "lastsave",
  "key?"                 => "exists",
  "delete"               => "del",
  "randkey"              => "randomkey",
  "list_length"          => "llen",
  "push_tail"            => "rpush",
  "push_head"            => "lpush",
  "pop_tail"             => "rpop",
  "pop_head"             => "lpop",
  "list_set"             => "lset",
  "list_range"           => "lrange",
  "list_trim"            => "ltrim",
  "list_index"           => "lindex",
  "list_rm"              => "lrem",
  "set_add"              => "sadd",
  "set_delete"           => "srem",
  "set_count"            => "scard",
  "set_member?"          => "sismember",
  "set_members"          => "smembers",
  "set_intersect"        => "sinter",
  "set_intersect_store"  => "sinterstore",
  "set_inter_store"      => "sinterstore",
  "set_union"            => "sunion",
  "set_union_store"      => "sunionstore",
  "set_diff"             => "sdiff",
  "set_diff_store"       => "sdiffstore",
  "set_move"             => "smove",
  "set_unless_exists"    => "setnx",
  "rename_unless_exists" => "renamenx",
  "type?"                => "type",
  "zset_add"             => "zadd",
  "zset_count"           => "zcard",
  "zset_range_by_score"  => "zrangebyscore",
  "zset_reverse_range"   => "zrevrange",
  "zset_range"           => "zrange",
  "zset_delete"          => "zrem",
  "zset_score"           => "zscore",
  "zset_incr_by"         => "zincrby",
  "zset_increment_by"    => "zincrby"
}
DISABLED_COMMANDS =
{
  "monitor" => true,
  "sync"    => true
}
BLOCKING_COMMANDS =
{
  "blpop" => true,
  "brpop" => true
}

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/redis/client.rb', line 139

def initialize(options = {})
  @host    =  options[:host]    || '127.0.0.1'
  @port    = (options[:port]    || 6379).to_i
  @db      = (options[:db]      || 0).to_i
  @timeout = (options[:timeout] || 5).to_i
  @password = options[:password]
  @logger  =  options[:logger]
  @thread_safe = options[:thread_safe]
  @binary_keys = options[:binary_keys]
  @mutex = Mutex.new if @thread_safe
  @sock = nil
  @pubsub = false

  log(self)
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv) ⇒ Object



385
386
387
# File 'lib/redis/client.rb', line 385

def method_missing(*argv)
  call_command(argv)
end

Instance Method Details

#[](key) ⇒ Object



163
164
165
# File 'lib/redis/client.rb', line 163

def [](key)
  get(key)
end

#[]=(key, value) ⇒ Object



167
168
169
# File 'lib/redis/client.rb', line 167

def []=(key,value)
  set(key, value)
end

#call_command(argv) ⇒ Object

Wrap raw_call_command to handle reconnection on socket error. We try to reconnect just one time, otherwise let the error araise.



331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/redis/client.rb', line 331

def call_command(argv)
  log(argv.inspect, :debug)

  connect_to_server unless connected?

  begin
    raw_call_command(argv.dup)
  rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
    if reconnect
      raw_call_command(argv.dup)
    else
      raise Errno::ECONNRESET
    end
  end
end

#connect_to(host, port) ⇒ Object



351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
# File 'lib/redis/client.rb', line 351

def connect_to(host, port)

  # We support connect_to() timeout only if system_timer is availabe
  # or if we are running against Ruby >= 1.9
  # Timeout reading from the socket instead will be supported anyway.
  if @timeout != 0 and Timer
    begin
      @sock = TCPSocket.new(host, port)
    rescue Timeout::Error
      @sock = nil
      raise Timeout::Error, "Timeout connecting to the server"
    end
  else
    @sock = TCPSocket.new(host, port)
  end

  @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

  # If the timeout is set we set the low level socket options in order
  # to make sure a blocking read will return after the specified number
  # of seconds. This hack is from memcached ruby client.
  set_socket_timeout!(@timeout) if @timeout

rescue Errno::ECONNREFUSED
  raise Errno::ECONNREFUSED, "Unable to connect to Redis on #{host}:#{port}"
end

#connect_to_serverObject



378
379
380
381
382
383
# File 'lib/redis/client.rb', line 378

def connect_to_server
  connect_to(@host, @port)
  call_command([:auth, @password]) if @password
  call_command([:select, @db]) if @db != 0
  @sock
end

#decr(key, decrement = nil) ⇒ Object



248
249
250
251
252
253
254
255
# File 'lib/redis/client.rb', line 248

def decr(key, decrement = nil)
  if decrement
    deprecated("decr with a decrement", :decrby, caller[0])
    decrby(key, decrement)
  else
    call_command([:decr, key])
  end
end

#execObject



274
275
276
277
# File 'lib/redis/client.rb', line 274

def exec
  # Need to override Kernel#exec.
  call_command([:exec])
end

#get(key) ⇒ Object



171
172
173
# File 'lib/redis/client.rb', line 171

def get(key)
  call_command([:get, key])
end

#get_size(string) ⇒ Object



484
485
486
# File 'lib/redis/client.rb', line 484

def get_size(string)
  string.bytesize
end

#incr(key, increment = nil) ⇒ Object



239
240
241
242
243
244
245
246
# File 'lib/redis/client.rb', line 239

def incr(key, increment = nil)
  if increment
    deprecated("incr with an increment", :incrby, caller[0])
    incrby(key, increment)
  else
    call_command([:incr, key])
  end
end

#mapped_mget(*keys) ⇒ Object

Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.



219
220
221
222
223
224
225
226
# File 'lib/redis/client.rb', line 219

def mapped_mget(*keys)
  result = {}
  mget(*keys).each do |value|
    key = keys.shift
    result.merge!(key => value) unless value.nil?
  end
  result
end

#mapped_mset(hash) ⇒ Object



200
201
202
# File 'lib/redis/client.rb', line 200

def mapped_mset(hash)
  mset(*hash.to_a.flatten)
end

#mapped_msetnx(hash) ⇒ Object



213
214
215
# File 'lib/redis/client.rb', line 213

def mapped_msetnx(hash)
  msetnx(*hash.to_a.flatten)
end

#maybe_lock(&block) ⇒ Object



453
454
455
456
457
458
459
# File 'lib/redis/client.rb', line 453

def maybe_lock(&block)
  if @thread_safe
    @mutex.synchronize(&block)
  else
    block.call
  end
end

#mset(*args) ⇒ Object



191
192
193
194
195
196
197
198
# File 'lib/redis/client.rb', line 191

def mset(*args)
  if args.size == 1
    deprecated("mset with a hash", :mapped_mset, caller[0])
    mapped_mset(args[0])
  else
    call_command(args.unshift(:mset))
  end
end

#msetnx(*args) ⇒ Object



204
205
206
207
208
209
210
211
# File 'lib/redis/client.rb', line 204

def msetnx(*args)
  if args.size == 1
    deprecated("msetnx with a hash", :mapped_msetnx, caller[0])
    mapped_msetnx(args[0])
  else
    call_command(args.unshift(:msetnx))
  end
end

#multi(&block) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/redis/client.rb', line 279

def multi(&block)
  result = call_command [:multi]

  return result unless block_given?

  begin
    yield(self)
    exec
  rescue Exception => e
    discard
    raise e
  end
end

#pipelined {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


268
269
270
271
272
# File 'lib/redis/client.rb', line 268

def pipelined(&block)
  pipeline = Pipeline.new self
  yield pipeline
  pipeline.execute
end

#process_command(command, argvv) ⇒ Object



445
446
447
448
449
450
451
# File 'lib/redis/client.rb', line 445

def process_command(command, argvv)
  @sock.write(command)
  argvv.map do |argv|
    processor = REPLY_PROCESSOR[argv[0].to_s]
    processor ? processor.call(read_reply) : read_reply
  end
end

#quitObject



263
264
265
266
# File 'lib/redis/client.rb', line 263

def quit
  call_command(['quit'])
rescue Errno::ECONNRESET
end

#raw_call_command(argvp) ⇒ Object



389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/redis/client.rb', line 389

def raw_call_command(argvp)
  if argvp[0].is_a?(Array)
    argvv = argvp
    pipeline = true
  else
    argvv = [argvp]
    pipeline = false
  end

  if @binary_keys or pipeline or MULTI_BULK_COMMANDS[argvv[0][0].to_s]
    command = ""
    argvv.each do |argv|
      command << "*#{argv.size}\r\n"
      argv.each{|a|
        a = a.to_s
        command << "$#{get_size(a)}\r\n"
        command << a
        command << "\r\n"
      }
    end
  else
    command = ""
    argvv.each do |argv|
      bulk = nil
      argv[0] = argv[0].to_s
      if ALIASES[argv[0]]
        deprecated(argv[0], ALIASES[argv[0]], caller[4])
        argv[0] = ALIASES[argv[0]]
      end
      raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
      if BULK_COMMANDS[argv[0]] and argv.length > 1
        bulk = argv[-1].to_s
        argv[-1] = get_size(bulk)
      end
      command << "#{argv.join(' ')}\r\n"
      command << "#{bulk}\r\n" if bulk
    end
  end
  # When in Pub/Sub mode we don't read replies synchronously.
  if @pubsub
    @sock.write(command)
    return true
  end
  # The normal command execution is reading and processing the reply.
  results = maybe_lock do
    begin
      set_socket_timeout!(0) if requires_timeout_reset?(argvv[0][0].to_s)
      process_command(command, argvv)
    ensure
      set_socket_timeout!(@timeout) if requires_timeout_reset?(argvv[0][0].to_s)
    end
  end

  return pipeline ? results : results[0]
end

#read_replyObject

Raises:

  • (Errno::ECONNRESET)


461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# File 'lib/redis/client.rb', line 461

def read_reply

  # We read the first byte using read() mainly because gets() is
  # immune to raw socket timeouts.
  begin
    reply_type = @sock.read(1)
  rescue Errno::EAGAIN

    # We want to make sure it reconnects on the next command after the
    # timeout. Otherwise the server may reply in the meantime leaving
    # the protocol in a desync status.
    disconnect

    raise Errno::EAGAIN, "Timeout reading from the socket"
  end

  raise Errno::ECONNRESET, "Connection lost" unless reply_type

  format_reply(reply_type, @sock.gets)
end

#select(*args) ⇒ Object



159
160
161
# File 'lib/redis/client.rb', line 159

def select(*args)
  raise "SELECT not allowed, use the :db option when creating the object"
end

#serverObject



347
348
349
# File 'lib/redis/client.rb', line 347

def server
  "#{@host}:#{@port}"
end

#set(key, value, ttl = nil) ⇒ Object



175
176
177
178
179
180
181
182
# File 'lib/redis/client.rb', line 175

def set(key, value, ttl = nil)
  if ttl
    deprecated("set with an expire", :set_with_expire, caller[0])
    set_with_expire(key, value, ttl)
  else
    call_command([:set, key, value])
  end
end

#set_with_expire(key, value, ttl) ⇒ Object



184
185
186
187
188
189
# File 'lib/redis/client.rb', line 184

def set_with_expire(key, value, ttl)
  multi do
    set(key, value)
    expire(key, ttl)
  end
end

#sort(key, options = {}) ⇒ Object



228
229
230
231
232
233
234
235
236
237
# File 'lib/redis/client.rb', line 228

def sort(key, options = {})
  cmd = []
  cmd << "SORT #{key}"
  cmd << "BY #{options[:by]}" if options[:by]
  cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
  cmd << "#{options[:order]}" if options[:order]
  cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
  cmd << "STORE #{options[:store]}" if options[:store]
  call_command(cmd)
end

#subscribe(*classes) {|sub| ... } ⇒ Object

Yields:

  • (sub)


293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
# File 'lib/redis/client.rb', line 293

def subscribe(*classes)
  # Top-level `subscribe` MUST be called with a block,
  # nested `subscribe` MUST NOT be called with a block
  if !@pubsub && !block_given?
    raise "Top-level subscribe requires a block"
  elsif @pubsub == true && block_given?
    raise "Nested subscribe does not take a block"
  elsif @pubsub
    # If we're already pubsub'ing, just subscribe us to some more classes
    call_command [:subscribe,*classes]
    return true
  end

  @pubsub = true
  call_command [:subscribe,*classes]
  sub = Subscription.new
  yield(sub)
  begin
    while true
      type, *reply = read_reply # type, [class,data]
      case type
      when 'subscribe','unsubscribe'
        sub.send(type) && sub.send(type).call(reply[0],reply[1])
      when 'message'
        sub.send(type) && sub.send(type).call(reply[0],reply[1])
      end
      break if type == 'unsubscribe' && reply[1] == 0
    end
  rescue RuntimeError
    call_command [:unsubscribe]
    raise
  ensure
    @pubsub = false
  end
end

#to_sObject



155
156
157
# File 'lib/redis/client.rb', line 155

def to_s
  "Redis Client connected to #{server} against DB #{@db}"
end

#type(key) ⇒ Object

Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing



259
260
261
# File 'lib/redis/client.rb', line 259

def type(key)
  call_command(['type', key])
end