Class: RedisRb::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/redis/client.rb

Direct Known Subclasses

Pipeline

Constant Summary collapse

OK =
"OK".freeze
MINUS =
"-".freeze
PLUS =
"+".freeze
COLON =
":".freeze
DOLLAR =
"$".freeze
ASTERISK =
"*".freeze
BULK_COMMANDS =
{
  "set"       => true,
  "setnx"     => true,
  "rpush"     => true,
  "lpush"     => true,
  "lset"      => true,
  "lrem"      => true,
  "sadd"      => true,
  "srem"      => true,
  "sismember" => true,
  "echo"      => true,
  "getset"    => true,
  "smove"     => true,
  "zadd"      => true,
  "zincrby"   => true,
  "zrem"      => true,
  "zscore"    => true,
  "zrank"     => true,
  "zrevrank"  => true,
  "hget"      => true,
  "hdel"      => true,
  "hexists"   => true
}
MULTI_BULK_COMMANDS =
{
  "mset"      => true,
  "msetnx"    => true,
  "hset"      => true
}
BOOLEAN_PROCESSOR =
lambda{|r| r == 1 }
REPLY_PROCESSOR =
{
  "exists"    => BOOLEAN_PROCESSOR,
  "sismember" => BOOLEAN_PROCESSOR,
  "sadd"      => BOOLEAN_PROCESSOR,
  "srem"      => BOOLEAN_PROCESSOR,
  "smove"     => BOOLEAN_PROCESSOR,
  "zadd"      => BOOLEAN_PROCESSOR,
  "zrem"      => BOOLEAN_PROCESSOR,
  "move"      => BOOLEAN_PROCESSOR,
  "setnx"     => BOOLEAN_PROCESSOR,
  "del"       => BOOLEAN_PROCESSOR,
  "renamenx"  => BOOLEAN_PROCESSOR,
  "expire"    => BOOLEAN_PROCESSOR,
  "hset"      => BOOLEAN_PROCESSOR,
  "hexists"   => BOOLEAN_PROCESSOR,
  "info"      => lambda{|r|
    info = {}
    r.each_line {|kv|
      k,v = kv.split(":",2).map{|x| x.chomp}
      info[k.to_sym] = v
    }
    info
  },
  "keys"      => lambda{|r|
    if r.is_a?(Array)
        r
    else
        r.split(" ")
    end
  },
  "hgetall"   => lambda{|r|
    Hash[*r]
  }
}
ALIASES =
{
  "flush_db"             => "flushdb",
  "flush_all"            => "flushall",
  "last_save"            => "lastsave",
  "key?"                 => "exists",
  "delete"               => "del",
  "randkey"              => "randomkey",
  "list_length"          => "llen",
  "push_tail"            => "rpush",
  "push_head"            => "lpush",
  "pop_tail"             => "rpop",
  "pop_head"             => "lpop",
  "list_set"             => "lset",
  "list_range"           => "lrange",
  "list_trim"            => "ltrim",
  "list_index"           => "lindex",
  "list_rm"              => "lrem",
  "set_add"              => "sadd",
  "set_delete"           => "srem",
  "set_count"            => "scard",
  "set_member?"          => "sismember",
  "set_members"          => "smembers",
  "set_intersect"        => "sinter",
  "set_intersect_store"  => "sinterstore",
  "set_inter_store"      => "sinterstore",
  "set_union"            => "sunion",
  "set_union_store"      => "sunionstore",
  "set_diff"             => "sdiff",
  "set_diff_store"       => "sdiffstore",
  "set_move"             => "smove",
  "set_unless_exists"    => "setnx",
  "rename_unless_exists" => "renamenx",
  "type?"                => "type",
  "zset_add"             => "zadd",
  "zset_count"           => "zcard",
  "zset_range_by_score"  => "zrangebyscore",
  "zset_reverse_range"   => "zrevrange",
  "zset_range"           => "zrange",
  "zset_delete"          => "zrem",
  "zset_score"           => "zscore",
  "zset_incr_by"         => "zincrby",
  "zset_increment_by"    => "zincrby"
}
DISABLED_COMMANDS =
{
  "monitor" => true,
  "sync"    => true
}

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.



126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/redis/client.rb', line 126

def initialize(options = {})
  @host    =  options[:host]    || '127.0.0.1'
  @port    = (options[:port]    || 6379).to_i
  @db      = (options[:db]      || 0).to_i
  @timeout = (options[:timeout] || 5).to_i
  @password = options[:password]
  @logger  =  options[:logger]
  @thread_safe = options[:thread_safe]
  @binary_keys = options[:binary_keys]
  @mutex = Mutex.new if @thread_safe
  @sock = nil

  @logger.info { self.to_s } if @logger
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv) ⇒ Object



189
190
191
# File 'lib/redis/client.rb', line 189

def method_missing(*argv)
  call_command(argv)
end

Instance Method Details

#[](key) ⇒ Object



270
271
272
# File 'lib/redis/client.rb', line 270

def [](key)
  self.get(key)
end

#[]=(key, value) ⇒ Object



274
275
276
# File 'lib/redis/client.rb', line 274

def []=(key,value)
  set(key,value)
end

#call_command(argv) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/redis/client.rb', line 193

def call_command(argv)
  @logger.debug { argv.inspect } if @logger

  # this wrapper to raw_call_command handle reconnection on socket
  # error. We try to reconnect just one time, otherwise let the error
  # araise.
  connect_to_server if !@sock

  begin
    raw_call_command(argv.dup)
  rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
    @sock.close rescue nil
    @sock = nil
    connect_to_server
    raw_call_command(argv.dup)
  end
end

#connect_to(host, port, timeout = nil) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/redis/client.rb', line 155

def connect_to(host, port, timeout=nil)
  # We support connect() timeout only if system_timer is availabe
  # or if we are running against Ruby >= 1.9
  # Timeout reading from the socket instead will be supported anyway.
  if @timeout != 0 and RedisTimer
    begin
      sock = TCPSocket.new(host, port)
    rescue Timeout::Error
      @sock = nil
      raise Timeout::Error, "Timeout connecting to the server"
    end
  else
    sock = TCPSocket.new(host, port)
  end
  sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1

  # If the timeout is set we set the low level socket options in order
  # to make sure a blocking read will return after the specified number
  # of seconds. This hack is from memcached ruby client.
  if timeout
    secs   = Integer(timeout)
    usecs  = Integer((timeout - secs) * 1_000_000)
    optval = [secs, usecs].pack("l_2")
    begin
      sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
      sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
    rescue Exception => ex
      # Solaris, for one, does not like/support socket timeouts.
      @logger.info "Unable to use raw socket timeouts: #{ex.class.name}: #{ex.message}" if @logger
    end
  end
  sock
end

#connect_to_serverObject



149
150
151
152
153
# File 'lib/redis/client.rb', line 149

def connect_to_server
  @sock = connect_to(@host, @port, @timeout == 0 ? nil : @timeout)
  call_command(["auth",@password]) if @password
  call_command(["select",@db]) unless @db == 0
end

#decr(key, decrement = nil) ⇒ Object



317
318
319
# File 'lib/redis/client.rb', line 317

def decr(key,decrement = nil)
  call_command(decrement ? ["decrby",key,decrement] : ["decr",key])
end

#execObject



390
391
392
393
# File 'lib/redis/client.rb', line 390

def exec
  # Need to override Kernel#exec.
  call_command([:exec])
end

#incr(key, increment = nil) ⇒ Object



313
314
315
# File 'lib/redis/client.rb', line 313

def incr(key, increment = nil)
  call_command(increment ? ["incrby",key,increment] : ["incr",key])
end

#mapped_mget(*keys) ⇒ Object

Similar to memcache.rb’s #get_multi, returns a hash mapping keys to values.



323
324
325
326
327
328
329
330
# File 'lib/redis/client.rb', line 323

def mapped_mget(*keys)
  result = {}
  mget(*keys).each do |value|
    key = keys.shift
    result.merge!(key => value) unless value.nil?
  end
  result
end

#maybe_lock(&block) ⇒ Object



258
259
260
261
262
263
264
# File 'lib/redis/client.rb', line 258

def maybe_lock(&block)
  if @thread_safe
    @mutex.synchronize(&block)
  else
    block.call
  end
end

#mset(*args) ⇒ Object



284
285
286
287
288
289
290
291
# File 'lib/redis/client.rb', line 284

def mset(*args)
  hsh = args.pop if Hash === args.last
  if hsh
    call_command(hsh.to_a.flatten.unshift(:mset))
  else
    call_command(args.unshift(:mset))
  end
end

#msetnx(*args) ⇒ Object



293
294
295
296
297
298
299
300
# File 'lib/redis/client.rb', line 293

def msetnx(*args)
  hsh = args.pop if Hash === args.last
  if hsh
    call_command(hsh.to_a.flatten.unshift(:msetnx))
  else
    call_command(args.unshift(:msetnx))
  end
end

#multi(&block) ⇒ Object



395
396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/redis/client.rb', line 395

def multi(&block)
  result = call_command [:multi]

  return result unless block_given?

  begin
    yield(self)
    exec
  rescue Exception => e
    discard
    raise e
  end
end

#pipelined {|pipeline| ... } ⇒ Object

Yields:

  • (pipeline)


343
344
345
346
347
# File 'lib/redis/client.rb', line 343

def pipelined(&block)
  pipeline = Pipeline.new self
  yield pipeline
  pipeline.execute
end

#process_command(command, argvv) ⇒ Object



250
251
252
253
254
255
256
# File 'lib/redis/client.rb', line 250

def process_command(command, argvv)
  @sock.write(command)
  argvv.map do |argv|
    processor = REPLY_PROCESSOR[argv[0].to_s]
    processor ? processor.call(read_reply) : read_reply
  end
end

#quitObject



338
339
340
341
# File 'lib/redis/client.rb', line 338

def quit
  call_command(['quit'])
rescue Errno::ECONNRESET
end

#raw_call_command(argvp) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/redis/client.rb', line 211

def raw_call_command(argvp)
  if argvp[0].is_a?(Array)
    argvv = argvp
    pipeline = true
  else
    argvv = [argvp]
  end

  if @binary_keys or MULTI_BULK_COMMANDS[argvv[0][0].to_s]
    command = ""
    argvv.each do |argv|
      command << "*#{argv.size}\r\n"
      argv.each{|a|
        a = a.to_s
        command << "$#{get_size(a)}\r\n"
        command << a
        command << "\r\n"
      }
    end
  else
    command = ""
    argvv.each do |argv|
      bulk = nil
      argv[0] = argv[0].to_s.downcase
      argv[0] = ALIASES[argv[0]] if ALIASES[argv[0]]
      raise "#{argv[0]} command is disabled" if DISABLED_COMMANDS[argv[0]]
      if BULK_COMMANDS[argv[0]] and argv.length > 1
        bulk = argv[-1].to_s
        argv[-1] = get_size(bulk)
      end
      command << "#{argv.join(' ')}\r\n"
      command << "#{bulk}\r\n" if bulk
    end
  end
  results = maybe_lock { process_command(command, argvv) }

  return pipeline ? results : results[0]
end

#read_replyObject

Raises:

  • (Errno::ECONNRESET)


349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
# File 'lib/redis/client.rb', line 349

def read_reply
  # We read the first byte using read() mainly because gets() is
  # immune to raw socket timeouts.
  begin
    rtype = @sock.read(1)
  rescue Errno::EAGAIN
    # We want to make sure it reconnects on the next command after the
    # timeout. Otherwise the server may reply in the meantime leaving
    # the protocol in a desync status.
    @sock = nil
    raise Errno::EAGAIN, "Timeout reading from the socket"
  end

  raise Errno::ECONNRESET,"Connection lost" if !rtype
  line = @sock.gets
  case rtype
  when MINUS
    raise MINUS + line.strip
  when PLUS
    line.strip
  when COLON
    line.to_i
  when DOLLAR
    bulklen = line.to_i
    return nil if bulklen == -1
    data = @sock.read(bulklen)
    @sock.read(2) # CRLF
    data
  when ASTERISK
    objects = line.to_i
    return nil if bulklen == -1
    res = []
    objects.times {
      res << read_reply
    }
    res
  else
    raise "Protocol error, got '#{rtype}' as initial reply byte"
  end
end

#select(*args) ⇒ Object



266
267
268
# File 'lib/redis/client.rb', line 266

def select(*args)
  raise "SELECT not allowed, use the :db option when creating the object"
end

#serverObject



145
146
147
# File 'lib/redis/client.rb', line 145

def server
  "#{@host}:#{@port}"
end

#set(key, value, expiry = nil) ⇒ Object



278
279
280
281
282
# File 'lib/redis/client.rb', line 278

def set(key, value, expiry=nil)
  s = call_command([:set, key, value]) == OK
  expire(key, expiry) if s && expiry
  s
end

#sort(key, options = {}) ⇒ Object



302
303
304
305
306
307
308
309
310
311
# File 'lib/redis/client.rb', line 302

def sort(key, options = {})
  cmd = ["SORT"]
  cmd << key
  cmd << "BY #{options[:by]}" if options[:by]
  cmd << "GET #{[options[:get]].flatten * ' GET '}" if options[:get]
  cmd << "#{options[:order]}" if options[:order]
  cmd << "LIMIT #{options[:limit].join(' ')}" if options[:limit]
  cmd << "STORE #{options[:store]}" if options[:store]
  call_command(cmd)
end

#to_sObject



141
142
143
# File 'lib/redis/client.rb', line 141

def to_s
  "Redis Client connected to #{server} against DB #{@db}"
end

#type(key) ⇒ Object

Ruby defines a now deprecated type method so we need to override it here since it will never hit method_missing



334
335
336
# File 'lib/redis/client.rb', line 334

def type(key)
  call_command(['type', key])
end