Class: EventMachine::Protocols::RedisCluster

Inherits:
Object
  • Object
show all
Defined in:
lib/em_redis_cluster/cluster.rb

Constant Summary collapse

RedisClusterHashSlots =
16384
RedisClusterRequestTTL =
16
RedisClusterDefaultTimeout =
1

Instance Method Summary collapse

Constructor Details

#initialize(startup_nodes, opt = {}) ⇒ RedisCluster

Returns a new instance of RedisCluster.



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/em_redis_cluster/cluster.rb', line 35

def initialize(startup_nodes, opt={})
  @startup_nodes = startup_nodes
  
  @connections = {}
  @opt = opt
  
  # Redis Cluster does not support multiple databases like the stand alone version of Redis. 
  # There is just database 0 and the SELECT command is not allowed
  @opt.delete(:db)
  @logger = @opt[:logger]

  @refresh_table_asap = false
  @slots_initialized = false
  @command_before_init = []
  initialize_slots_cache {|c| yield(c) if block_given?}
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(*argv, &blk) ⇒ Object

Currently we handle all the commands using method_missing for simplicity. For a Cluster client actually it will be better to have every single command as a method with the right arity and possibly additional checks (example: RPOPLPUSH with same src/dst key, SORT without GET or BY, and so forth).



297
298
299
300
301
302
303
304
# File 'lib/em_redis_cluster/cluster.rb', line 297

def method_missing(*argv, &blk)
  argv << blk
  if ready?
    send_cluster_command(argv)
  else
    @command_before_init << argv
  end
end

Instance Method Details

#all_connectionsObject



218
219
220
221
222
223
# File 'lib/em_redis_cluster/cluster.rb', line 218

def all_connections
  @startup_nodes.each do |n|
    @connections[n[:name]] ||= get_redis_link(n[:host], n[:port])
  end
  @connections.values
end

#any_error?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/em_redis_cluster/cluster.rb', line 60

def any_error?
  ready? && all_connections.any?{|c| c.error?}
end

#conn_close(key) ⇒ Object

used for testing connection auto-recovery



317
318
319
320
# File 'lib/em_redis_cluster/cluster.rb', line 317

def conn_close(key)
  c = get_connection_by_key(key)
  c && c.close_connection_after_writing
end

#conn_error?(key) ⇒ Boolean

used for testing connection auto-recovery

Returns:

  • (Boolean)


323
324
325
326
# File 'lib/em_redis_cluster/cluster.rb', line 323

def conn_error?(key)
  c = get_connection_by_key(key)
  c && c.error?
end

#conn_statusObject



64
65
66
67
68
# File 'lib/em_redis_cluster/cluster.rb', line 64

def conn_status
  status = {}
  @connections.each {|k,c| status[k] = !c.error?}
  status
end

#flush_slots_cacheObject

Flush the cache, mostly useful for debugging when we want to force redirection.



153
154
155
# File 'lib/em_redis_cluster/cluster.rb', line 153

def flush_slots_cache
  @slots = Array.new(RedisClusterHashSlots)
end

#get_connection_by_key(key) ⇒ Object

Given a slot return the link (Redis instance) to the mapped node. Make sure to create a connection with the node if we don’t have one.



204
205
206
207
208
209
210
211
# File 'lib/em_redis_cluster/cluster.rb', line 204

def get_connection_by_key(key)
  if n = @slots[keyslot(key)]
    @connections[n[:name]] ||= get_redis_link(n[:host], n[:port])
  else
    # If we don't know what the mapping is, return a random node.
    get_random_connection
  end
end

#get_connection_by_node(n) ⇒ Object



213
214
215
216
# File 'lib/em_redis_cluster/cluster.rb', line 213

def get_connection_by_node(n)
  set_node_name!(n)
  @connections[n[:name]] ||= get_redis_link(n[:host], n[:port])
end

#get_key_from_command(argv) ⇒ Object

Return the first key in the command arguments.

Currently we just return argv, that is, the first argument after the command name.

This is indeed the key for most commands, and when it is not true the cluster redirection will point us to the right node anyway.

For commands we want to explicitly bad as they don’t make sense in the context of cluster, nil is returned.



184
185
186
187
188
189
190
191
192
193
194
# File 'lib/em_redis_cluster/cluster.rb', line 184

def get_key_from_command(argv)
  case argv[0].to_s.downcase
  when "info","multi","exec","slaveof","config","shutdown","select"
    nil
  else
    # Unknown commands, and all the commands having the key
    # as first argument are handled here:
    # set, get, ...
    argv[1]
  end
end

#get_random_connectionObject



196
197
198
199
# File 'lib/em_redis_cluster/cluster.rb', line 196

def get_random_connection
  n = @startup_nodes.shuffle.first
  @connections[n[:name]] ||= get_redis_link(n[:host], n[:port])
end


70
71
72
# File 'lib/em_redis_cluster/cluster.rb', line 70

def get_redis_link(host, port)
  EM::Protocols::Redis.connect({:host => host, :port => port}.merge @opt)
end

#get_slotname_by_key(key) ⇒ Object

used for testing connection auto-recovery



329
330
331
# File 'lib/em_redis_cluster/cluster.rb', line 329

def get_slotname_by_key(key)
  @slots[keyslot(key)][:name] rescue nil
end

#initialize_slots_cacheObject

Contact the startup nodes and try to fetch the hash slots -> instances map in order to initialize the @slots hash.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/em_redis_cluster/cluster.rb', line 84

def initialize_slots_cache
  @slots = Array.new(RedisClusterHashSlots)

  fiber = Fiber.new do

    @startup_nodes.each do |n|
      @nodes = []

      r = get_redis_link(n[:host], n[:port])

      r.errback {|e| fiber.resume(nil)}

      r.cluster("slots") {|rsp| fiber.resume(rsp)}

      rsp = Fiber.yield
      r.close_connection

      if rsp.is_a?(Array)
        rsp.each do |r|
          
          ip, port = r[2]
          # somehow redis return "" for the node it's querying
          ip = n[:host] if ip == ""

          node = set_node_name!(host: ip, port: port)
          @nodes << node

          (r[0]..r[1]).each {|slot| @slots[slot] = node}
        end

        populate_startup_nodes
        @refresh_table_asap = false
        @slots_initialized = true

        # Exit the loop as long as the first node replies
        break
      else
        next
      end
    end

    log :debug, "RedisCluster: #{@nodes}"

    yield(self) if block_given?

    # run cached commands before initialization
    if ready?
      @command_before_init.each do |argv|
        argv.respond_to?(:call) ? argv.call : send_cluster_command(argv)
      end
    end
    @command_before_init = []
  end

  fiber.resume
end

#keyslot(key) ⇒ Object

Return the hash slot from the key.



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/em_redis_cluster/cluster.rb', line 158

def keyslot(key)
  # Only hash what is inside {...} if there is such a pattern in the key.
  # Note that the specification requires the content that is between
  # the first { and the first } after the first {. If we found {} without
  # nothing in the middle, the whole key is hashed as usually.
  s = key.index "{"
  if s
    e = key.index "}",s+1
    if e && e != s+1
      key = key[s+1..e-1]
    end
  end

  RedisClusterCRC16.crc16(key) % RedisClusterHashSlots
end

#log(severity, msg) ⇒ Object



52
53
54
# File 'lib/em_redis_cluster/cluster.rb', line 52

def log(severity, msg)
  @logger && @logger.send(severity, "em_redis_cluster: #{msg}")
end

#populate_startup_nodesObject

Use @nodes to populate @startup_nodes, so that we have more chances if a subset of the cluster fails.



143
144
145
146
147
148
149
# File 'lib/em_redis_cluster/cluster.rb', line 143

def populate_startup_nodes
  # Make sure every node has already a name, so that later the
  # Array uniq! method will work reliably.
  @startup_nodes.each{|n| set_node_name!(n)}
  @nodes.each{|n| @startup_nodes << n}
  @startup_nodes.uniq!
end

#ready?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/em_redis_cluster/cluster.rb', line 56

def ready?
  @slots_initialized
end

#send_cluster_command(argv) ⇒ Object

Dispatch commands.



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/em_redis_cluster/cluster.rb', line 226

def send_cluster_command(argv)
  
  callback = argv.pop
  callback = nil unless callback.respond_to?(:call)

  ttl = RedisClusterRequestTTL
  asking = false
  conn_for_next_cmd = nil
  try_random_node = false


  fiber = Fiber.new do
    while ttl > 0 do
      key = get_key_from_command(argv)

      # raise Redis::ParserError.new("No way to dispatch this command to Redis Cluster.") unless key

      # The full semantics of ASK redirection from the point of view of the client is as follows:
      #   If ASK redirection is received, send only the query that was redirected to the specified node but continue sending subsequent queries to the old node.
      #   Start the redirected query with the ASKING command.
      #   Don't yet update local client tables to map hash slot 8 to B.
      conn_for_next_cmd ||= (key ? get_connection_by_key(key) : get_random_connection)
      conn_for_next_cmd.asking if asking
      conn_for_next_cmd.send(argv[0].to_sym, *argv[1..-1]) {|rsp| fiber.resume(rsp) }

      rsp = Fiber.yield

      conn_for_next_cmd = nil
      asking = false

      if rsp.is_a?(RedisError)
        errv = rsp.to_s.split
        if errv[0] == "MOVED" || errv[0] == "ASK"
          log :debug, rsp.to_s

          newslot = errv[1].to_i
          node_ip, node_port = errv[2].split(":").map{|x|x.strip}

          if errv[0] == "ASK"
            asking = true
            conn_for_next_cmd = get_connection_by_node(host: node_ip, port: node_port)
          else
            # Serve replied with MOVED. It's better for us to ask for CLUSTER NODES the next time.
            @refresh_table_asap = true
            @slots[newslot] = set_node_name!(host: node_ip, port: node_port.to_i)
          end
        else
          callback && callback.call(rsp)
          break
        end
      else
        callback && callback.call(rsp)
        break
      end

      ttl -= 1
    end
    
    callback && callback.call(rsp) if ttl == 0

    initialize_slots_cache if @refresh_table_asap
  end

  fiber.resume
end

#set_node_name!(n) ⇒ Object

Given a node (that is just a Ruby hash) give it a name just concatenating the host and port. We use the node name as a key to cache connections to that node.



77
78
79
80
# File 'lib/em_redis_cluster/cluster.rb', line 77

def set_node_name!(n)
  n[:name] ||= "#{n[:host]}:#{n[:port]}"
  n
end