Class: EventMachine::Protocols::RedisCluster
- Inherits:
-
Object
- Object
- EventMachine::Protocols::RedisCluster
- Defined in:
- lib/em_redis_cluster/cluster.rb
Constant Summary collapse
- RedisClusterHashSlots =
16384
- RedisClusterRequestTTL =
16
- RedisClusterDefaultTimeout =
1
Instance Method Summary collapse
- #all_connections ⇒ Object
- #any_error? ⇒ Boolean
-
#conn_close(key) ⇒ Object
used for testing connection auto-recovery.
-
#conn_error?(key) ⇒ Boolean
used for testing connection auto-recovery.
- #conn_status ⇒ Object
-
#flush_slots_cache ⇒ Object
Flush the cache, mostly useful for debugging when we want to force redirection.
-
#get_connection_by_key(key) ⇒ Object
Given a slot return the link (Redis instance) to the mapped node.
- #get_connection_by_node(n) ⇒ Object
-
#get_key_from_command(argv) ⇒ Object
Return the first key in the command arguments.
- #get_random_connection ⇒ Object
- #get_redis_link(host, port) ⇒ Object
-
#get_slotname_by_key(key) ⇒ Object
used for testing connection auto-recovery.
-
#initialize(startup_nodes, opt = {}) ⇒ RedisCluster
constructor
A new instance of RedisCluster.
-
#initialize_slots_cache ⇒ Object
Contact the startup nodes and try to fetch the hash slots -> instances map in order to initialize the @slots hash.
-
#keyslot(key) ⇒ Object
Return the hash slot from the key.
- #log(severity, msg) ⇒ Object
-
#method_missing(*argv, &blk) ⇒ Object
Currently we handle all the commands using method_missing for simplicity.
-
#populate_startup_nodes ⇒ Object
Use @nodes to populate @startup_nodes, so that we have more chances if a subset of the cluster fails.
- #ready? ⇒ Boolean
-
#send_cluster_command(argv) ⇒ Object
Dispatch commands.
-
#set_node_name!(n) ⇒ Object
Given a node (that is just a Ruby hash) give it a name just concatenating the host and port.
Constructor Details
#initialize(startup_nodes, opt = {}) ⇒ RedisCluster
Returns a new instance of RedisCluster.
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/em_redis_cluster/cluster.rb', line 35 def initialize(startup_nodes, opt={}) @startup_nodes = startup_nodes @connections = {} @opt = opt # Redis Cluster does not support multiple databases like the stand alone version of Redis. # There is just database 0 and the SELECT command is not allowed @opt.delete(:db) @logger = @opt[:logger] @refresh_table_asap = false @slots_initialized = false @command_before_init = [] initialize_slots_cache {|c| yield(c) if block_given?} end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(*argv, &blk) ⇒ Object
Currently we handle all the commands using method_missing for simplicity. For a Cluster client actually it will be better to have every single command as a method with the right arity and possibly additional checks (example: RPOPLPUSH with same src/dst key, SORT without GET or BY, and so forth).
297 298 299 300 301 302 303 304 |
# File 'lib/em_redis_cluster/cluster.rb', line 297 def method_missing(*argv, &blk) argv << blk if ready? send_cluster_command(argv) else @command_before_init << argv end end |
Instance Method Details
#all_connections ⇒ Object
218 219 220 221 222 223 |
# File 'lib/em_redis_cluster/cluster.rb', line 218 def all_connections @startup_nodes.each do |n| @connections[n[:name]] ||= get_redis_link(n[:host], n[:port]) end @connections.values end |
#any_error? ⇒ Boolean
60 61 62 |
# File 'lib/em_redis_cluster/cluster.rb', line 60 def any_error? ready? && all_connections.any?{|c| c.error?} end |
#conn_close(key) ⇒ Object
used for testing connection auto-recovery
317 318 319 320 |
# File 'lib/em_redis_cluster/cluster.rb', line 317 def conn_close(key) c = get_connection_by_key(key) c && c.close_connection_after_writing end |
#conn_error?(key) ⇒ Boolean
used for testing connection auto-recovery
323 324 325 326 |
# File 'lib/em_redis_cluster/cluster.rb', line 323 def conn_error?(key) c = get_connection_by_key(key) c && c.error? end |
#conn_status ⇒ Object
64 65 66 67 68 |
# File 'lib/em_redis_cluster/cluster.rb', line 64 def conn_status status = {} @connections.each {|k,c| status[k] = !c.error?} status end |
#flush_slots_cache ⇒ Object
Flush the cache, mostly useful for debugging when we want to force redirection.
153 154 155 |
# File 'lib/em_redis_cluster/cluster.rb', line 153 def flush_slots_cache @slots = Array.new(RedisClusterHashSlots) end |
#get_connection_by_key(key) ⇒ Object
Given a slot return the link (Redis instance) to the mapped node. Make sure to create a connection with the node if we don’t have one.
204 205 206 207 208 209 210 211 |
# File 'lib/em_redis_cluster/cluster.rb', line 204 def get_connection_by_key(key) if n = @slots[keyslot(key)] @connections[n[:name]] ||= get_redis_link(n[:host], n[:port]) else # If we don't know what the mapping is, return a random node. get_random_connection end end |
#get_connection_by_node(n) ⇒ Object
213 214 215 216 |
# File 'lib/em_redis_cluster/cluster.rb', line 213 def get_connection_by_node(n) set_node_name!(n) @connections[n[:name]] ||= get_redis_link(n[:host], n[:port]) end |
#get_key_from_command(argv) ⇒ Object
Return the first key in the command arguments.
Currently we just return argv, that is, the first argument after the command name.
This is indeed the key for most commands, and when it is not true the cluster redirection will point us to the right node anyway.
For commands we want to explicitly bad as they don’t make sense in the context of cluster, nil is returned.
184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/em_redis_cluster/cluster.rb', line 184 def get_key_from_command(argv) case argv[0].to_s.downcase when "info","multi","exec","slaveof","config","shutdown","select" nil else # Unknown commands, and all the commands having the key # as first argument are handled here: # set, get, ... argv[1] end end |
#get_random_connection ⇒ Object
196 197 198 199 |
# File 'lib/em_redis_cluster/cluster.rb', line 196 def get_random_connection n = @startup_nodes.shuffle.first @connections[n[:name]] ||= get_redis_link(n[:host], n[:port]) end |
#get_redis_link(host, port) ⇒ Object
70 71 72 |
# File 'lib/em_redis_cluster/cluster.rb', line 70 def get_redis_link(host, port) EM::Protocols::Redis.connect({:host => host, :port => port}.merge @opt) end |
#get_slotname_by_key(key) ⇒ Object
used for testing connection auto-recovery
329 330 331 |
# File 'lib/em_redis_cluster/cluster.rb', line 329 def get_slotname_by_key(key) @slots[keyslot(key)][:name] rescue nil end |
#initialize_slots_cache ⇒ Object
Contact the startup nodes and try to fetch the hash slots -> instances map in order to initialize the @slots hash.
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/em_redis_cluster/cluster.rb', line 84 def initialize_slots_cache @slots = Array.new(RedisClusterHashSlots) fiber = Fiber.new do @startup_nodes.each do |n| @nodes = [] r = get_redis_link(n[:host], n[:port]) r.errback {|e| fiber.resume(nil)} r.cluster("slots") {|rsp| fiber.resume(rsp)} rsp = Fiber.yield r.close_connection if rsp.is_a?(Array) rsp.each do |r| ip, port = r[2] # somehow redis return "" for the node it's querying ip = n[:host] if ip == "" node = set_node_name!(host: ip, port: port) @nodes << node (r[0]..r[1]).each {|slot| @slots[slot] = node} end populate_startup_nodes @refresh_table_asap = false @slots_initialized = true # Exit the loop as long as the first node replies break else next end end log :debug, "RedisCluster: #{@nodes}" yield(self) if block_given? # run cached commands before initialization if ready? @command_before_init.each do |argv| argv.respond_to?(:call) ? argv.call : send_cluster_command(argv) end end @command_before_init = [] end fiber.resume end |
#keyslot(key) ⇒ Object
Return the hash slot from the key.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/em_redis_cluster/cluster.rb', line 158 def keyslot(key) # Only hash what is inside {...} if there is such a pattern in the key. # Note that the specification requires the content that is between # the first { and the first } after the first {. If we found {} without # nothing in the middle, the whole key is hashed as usually. s = key.index "{" if s e = key.index "}",s+1 if e && e != s+1 key = key[s+1..e-1] end end RedisClusterCRC16.crc16(key) % RedisClusterHashSlots end |
#log(severity, msg) ⇒ Object
52 53 54 |
# File 'lib/em_redis_cluster/cluster.rb', line 52 def log(severity, msg) @logger && @logger.send(severity, "em_redis_cluster: #{msg}") end |
#populate_startup_nodes ⇒ Object
Use @nodes to populate @startup_nodes, so that we have more chances if a subset of the cluster fails.
143 144 145 146 147 148 149 |
# File 'lib/em_redis_cluster/cluster.rb', line 143 def populate_startup_nodes # Make sure every node has already a name, so that later the # Array uniq! method will work reliably. @startup_nodes.each{|n| set_node_name!(n)} @nodes.each{|n| @startup_nodes << n} @startup_nodes.uniq! end |
#ready? ⇒ Boolean
56 57 58 |
# File 'lib/em_redis_cluster/cluster.rb', line 56 def ready? @slots_initialized end |
#send_cluster_command(argv) ⇒ Object
Dispatch commands.
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/em_redis_cluster/cluster.rb', line 226 def send_cluster_command(argv) callback = argv.pop callback = nil unless callback.respond_to?(:call) ttl = RedisClusterRequestTTL asking = false conn_for_next_cmd = nil try_random_node = false fiber = Fiber.new do while ttl > 0 do key = get_key_from_command(argv) # raise Redis::ParserError.new("No way to dispatch this command to Redis Cluster.") unless key # The full semantics of ASK redirection from the point of view of the client is as follows: # If ASK redirection is received, send only the query that was redirected to the specified node but continue sending subsequent queries to the old node. # Start the redirected query with the ASKING command. # Don't yet update local client tables to map hash slot 8 to B. conn_for_next_cmd ||= (key ? get_connection_by_key(key) : get_random_connection) conn_for_next_cmd.asking if asking conn_for_next_cmd.send(argv[0].to_sym, *argv[1..-1]) {|rsp| fiber.resume(rsp) } rsp = Fiber.yield conn_for_next_cmd = nil asking = false if rsp.is_a?(RedisError) errv = rsp.to_s.split if errv[0] == "MOVED" || errv[0] == "ASK" log :debug, rsp.to_s newslot = errv[1].to_i node_ip, node_port = errv[2].split(":").map{|x|x.strip} if errv[0] == "ASK" asking = true conn_for_next_cmd = get_connection_by_node(host: node_ip, port: node_port) else # Serve replied with MOVED. It's better for us to ask for CLUSTER NODES the next time. @refresh_table_asap = true @slots[newslot] = set_node_name!(host: node_ip, port: node_port.to_i) end else callback && callback.call(rsp) break end else callback && callback.call(rsp) break end ttl -= 1 end callback && callback.call(rsp) if ttl == 0 initialize_slots_cache if @refresh_table_asap end fiber.resume end |
#set_node_name!(n) ⇒ Object
Given a node (that is just a Ruby hash) give it a name just concatenating the host and port. We use the node name as a key to cache connections to that node.
77 78 79 80 |
# File 'lib/em_redis_cluster/cluster.rb', line 77 def set_node_name!(n) n[:name] ||= "#{n[:host]}:#{n[:port]}" n end |