Class: RedisClient

Inherits:
Object
  • Object
show all
Defined in:
lib/rubyredis.rb

Constant Summary collapse

BulkCommands =
{
    "set"=>true, "setnx"=>true, "rpush"=>true, "lpush"=>true, "lset"=>true,
    "lrem"=>true, "sadd"=>true, "srem"=>true, "sismember"=>true,
    "echo"=>true, "getset"=>true, "smove"=>true
}
ConvertToBool =
lambda{|r| r == 0 ? false : r}
ReplyProcessor =
{
    "exists" => ConvertToBool,
    "sismember"=> ConvertToBool,
    "sadd"=> ConvertToBool,
    "srem"=> ConvertToBool,
    "smove"=> ConvertToBool,
    "move"=> ConvertToBool,
    "setnx"=> ConvertToBool,
    "del"=> ConvertToBool,
    "renamenx"=> ConvertToBool,
    "expire"=> ConvertToBool,
    "keys" => lambda{|r| r.split(" ")},
    "info" => lambda{|r| 
        info = {}
        r.each_line {|kv|
            k,v = kv.split(":",2).map{|x| x.chomp}
            info[k.to_sym] = v
        }
        info
    }
}
Aliases =
{
    "flush_db" => "flushdb",
    "flush_all" => "flushall",
    "last_save" => "lastsave",
    "key?" => "exists",
    "delete" => "del",
    "randkey" => "randomkey",
    "list_length" => "llen",
    "push_tail" => "rpush",
    "push_head" => "lpush",
    "pop_tail" => "rpop",
    "pop_head" => "lpop",
    "list_set" => "lset",
    "list_range" => "lrange",
    "list_trim" => "ltrim",
    "list_index" => "lindex",
    "list_rm" => "lrem",
    "set_add" => "sadd",
    "set_delete" => "srem",
    "set_count" => "scard",
    "set_member?" => "sismember",
    "set_members" => "smembers",
    "set_intersect" => "sinter",
    "set_intersect_store" => "sinterstore",
    "set_inter_store" => "sinterstore",
    "set_union" => "sunion",
    "set_union_store" => "sunionstore",
    "set_diff" => "sdiff",
    "set_diff_store" => "sdiffstore",
    "set_move" => "smove",
    "set_unless_exists" => "setnx",
    "rename_unless_exists" => "renamenx"
}

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ RedisClient

Returns a new instance of RedisClient.



88
89
90
91
92
93
94
# File 'lib/rubyredis.rb', line 88

def initialize(opts={})
    @host = opts[:host] || '127.0.0.1'
    @port = opts[:port] || 6379
    @db = opts[:db] || 0
    @timeout = opts[:timeout] || 0
    connect_to_server
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv) ⇒ Object



133
134
135
# File 'lib/rubyredis.rb', line 133

def method_missing(*argv)
    call_command(argv)
end

Instance Method Details

#[](key) ⇒ Object



171
172
173
# File 'lib/rubyredis.rb', line 171

def [](key)
    get(key)
end

#[]=(key, value) ⇒ Object



175
176
177
# File 'lib/rubyredis.rb', line 175

def []=(key,value)
    set(key,value)
end

#call_command(argv) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/rubyredis.rb', line 137

def call_command(argv)
    # this wrapper to raw_call_command handle reconnection on socket
    # error. We try to reconnect just one time, otherwise let the error
    # araise.
    connect_to_server if !@sock
    begin
        raw_call_command(argv)
    rescue Errno::ECONNRESET
        @sock.close
        connect_to_server
        raw_call_command(argv)
    end
end

#connect_to(host, port, timeout = nil) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/rubyredis.rb', line 105

def connect_to(host, port, timeout=nil)
    # We support connect() timeout only if system_timer is availabe
    # or if we are running against Ruby >= 1.9
    # Timeout reading from the socket instead will be supported anyway.
    if @timeout != 0 and RedisTimer
        begin
            sock = TCPSocket.new(host, port, 0)
        rescue Timeout::Error
            @sock = nil
            raise Timeout::Error, "Timeout connecting to the server"
        end
    else
        sock = TCPSocket.new(host, port, 0)
    end

    # If the timeout is set we set the low level socket options in order
    # to make sure a blocking read will return after the specified number
    # of seconds. This hack is from memcached ruby client.
    if timeout
        secs = Integer(timeout)
        usecs = Integer((timeout - secs) * 1_000_000)
        optval = [secs, usecs].pack("l_2")
        sock.setsockopt Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval
        sock.setsockopt Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval
    end
    sock
end

#connect_to_serverObject



100
101
102
103
# File 'lib/rubyredis.rb', line 100

def connect_to_server
    @sock = connect_to(@host,@port,@timeout == 0 ? nil : @timeout)
    call_command(["select",@db]) if @db != 0
end

#decr(key, decrement = nil) ⇒ Object



193
194
195
# File 'lib/rubyredis.rb', line 193

def decr(key,decrement=nil)
    call_command(decrement ? ["decrby",key,decrement] :  ["decr",key])
end

#incr(key, increment = nil) ⇒ Object



189
190
191
# File 'lib/rubyredis.rb', line 189

def incr(key,increment=nil)
    call_command(increment ? ["incrby",key,increment] :  ["incr",key])
end

#raw_call_command(argv) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/rubyredis.rb', line 151

def raw_call_command(argv)
    bulk = nil
    argv[0] = argv[0].to_s.downcase
    argv[0] = Aliases[argv[0]] if Aliases[argv[0]]
    if BulkCommands[argv[0]]
        bulk = argv[-1].to_s
        argv[-1] = bulk.length
    end
    @sock.write(argv.join(" ")+"\r\n")
    @sock.write(bulk+"\r\n") if bulk

    # Post process the reply if needed
    processor = ReplyProcessor[argv[0]]
    processor ? processor.call(read_reply) : read_reply
end

#read_replyObject

Raises:

  • (Errno::ECONNRESET)


197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/rubyredis.rb', line 197

def read_reply
    # We read the first byte using read() mainly because gets() is
    # immune to raw socket timeouts.
    begin
        rtype = @sock.read(1)
    rescue Errno::EAGAIN
        # We want to make sure it reconnects on the next command after the
        # timeout. Otherwise the server may reply in the meantime leaving
        # the protocol in a desync status.
        @sock = nil
        raise Errno::EAGAIN, "Timeout reading from the socket"
    end

    raise Errno::ECONNRESET,"Connection lost" if !rtype
    line = @sock.gets
    case rtype
    when "-"
        raise "-"+line.strip
    when "+"
        line.strip
    when ":"
        line.to_i
    when "$"
        bulklen = line.to_i
        return nil if bulklen == -1
        data = @sock.read(bulklen)
        @sock.read(2) # CRLF
        data
    when "*"
        objects = line.to_i
        return nil if bulklen == -1
        res = []
        objects.times {
            res << read_reply
        }
        res
    else
        raise "Protocol error, got '#{rtype}' as initial reply byte"
    end
end

#select(*args) ⇒ Object



167
168
169
# File 'lib/rubyredis.rb', line 167

def select(*args)
    raise "SELECT not allowed, use the :db option when creating the object"
end

#sort(key, opts = {}) ⇒ Object



179
180
181
182
183
184
185
186
187
# File 'lib/rubyredis.rb', line 179

def sort(key, opts={})
    cmd = []
    cmd << "SORT #{key}"
    cmd << "BY #{opts[:by]}" if opts[:by]
    cmd << "GET #{[opts[:get]].flatten * ' GET '}" if opts[:get]
    cmd << "#{opts[:order]}" if opts[:order]
    cmd << "LIMIT #{opts[:limit].join(' ')}" if opts[:limit]
    call_command(cmd)
end

#to_sObject



96
97
98
# File 'lib/rubyredis.rb', line 96

def to_s
    "Redis Client connected to #{@host}:#{@port} against DB #{@db}"
end