Class: Ruster::Cluster
Constant Summary collapse
- SLOTS =
16384
Instance Attribute Summary collapse
-
#entry ⇒ Object
readonly
Returns the value of attribute entry.
Class Method Summary collapse
Instance Method Summary collapse
- #add_node(ip, port) ⇒ Object
- #current_epoch ⇒ Object
- #each(*args, &block) ⇒ Object
- #fail? ⇒ Boolean
- #in_cluster?(node) ⇒ Boolean
- #info ⇒ Object
-
#initialize(entry) ⇒ Cluster
constructor
A new instance of Cluster.
- #known_nodes ⇒ Object
- #nodes ⇒ Object
- #ok? ⇒ Boolean
- #remove_node(bye) ⇒ Object
- #reshard(target, num_slots, sources) ⇒ Object
- #size ⇒ Object
- #slots_assigned ⇒ Object
- #slots_fail ⇒ Object
- #slots_ok ⇒ Object
- #slots_pfail ⇒ Object
- #state ⇒ Object
- #stats_messages_received ⇒ Object
- #stats_messages_sent ⇒ Object
Methods included from Util
Constructor Details
#initialize(entry) ⇒ Cluster
Returns a new instance of Cluster.
8 9 10 |
# File 'lib/ruster/cluster.rb', line 8 def initialize(entry) @entry = entry end |
Instance Attribute Details
#entry ⇒ Object (readonly)
Returns the value of attribute entry.
4 5 6 |
# File 'lib/ruster/cluster.rb', line 4 def entry @entry end |
Class Method Details
.create!(addrs) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/ruster/cluster.rb', line 80 def self.create!(addrs) # Check nodes nodes = addrs.map do |addr| node = ::Ruster::Node.new(addr) raise ArgumentError, "Redis Server at #{addr} not running in cluster mode" unless node.enabled? raise ArgumentError, "Redis Server at #{addr} already exists in a cluster" unless node.only_node? raise ArgumentError, "Redis Server at #{addr} is not empty" unless node.empty? node end # Allocate slots evenly among all nodes slots_by_node = 0.upto(SLOTS - 1).each_slice((SLOTS.to_f / nodes.length).ceil) nodes.each do |node| slots = slots_by_node.next.to_a node.add_slots(*slots) end # Create cluster by meeting nodes entry = nodes.shift nodes.each { |node| entry.meet node.ip, node.port } new(entry) end |
Instance Method Details
#add_node(ip, port) ⇒ Object
69 70 71 |
# File 'lib/ruster/cluster.rb', line 69 def add_node(ip, port) @entry.meet(ip, port) end |
#current_epoch ⇒ Object
52 53 54 |
# File 'lib/ruster/cluster.rb', line 52 def current_epoch info[:cluster_current_epoch].to_i end |
#each(*args, &block) ⇒ Object
109 110 111 112 113 |
# File 'lib/ruster/cluster.rb', line 109 def each(*args, &block) nodes.each do |node| yield node, node.call(*args) end end |
#fail? ⇒ Boolean
24 25 26 |
# File 'lib/ruster/cluster.rb', line 24 def fail? state == "fail" end |
#in_cluster?(node) ⇒ Boolean
140 141 142 |
# File 'lib/ruster/cluster.rb', line 140 def in_cluster?(node) nodes.any?{ |n| n.addr == node.addr } end |
#info ⇒ Object
12 13 14 |
# File 'lib/ruster/cluster.rb', line 12 def info @entry.cluster_info end |
#known_nodes ⇒ Object
44 45 46 |
# File 'lib/ruster/cluster.rb', line 44 def known_nodes info[:cluster_known_nodes].to_i end |
#nodes ⇒ Object
64 65 66 67 |
# File 'lib/ruster/cluster.rb', line 64 def nodes @entry.load! [@entry] + @entry.friends end |
#ok? ⇒ Boolean
20 21 22 |
# File 'lib/ruster/cluster.rb', line 20 def ok? state == "ok" end |
#remove_node(bye) ⇒ Object
73 74 75 76 77 78 |
# File 'lib/ruster/cluster.rb', line 73 def remove_node(bye) nodes.each do |node| next if node.id == bye.id node.forget(bye) end end |
#reshard(target, num_slots, sources) ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/ruster/cluster.rb', line 115 def reshard(target, num_slots, sources) raise ArgumentError, "Target node #{target} is not part of the cluster" unless in_cluster?(target) target.load! sources.each do |source| raise ArgumentError, "Source node #{source} is not part of the cluster" unless in_cluster?(source) source.load! end sources.sort_by!{ |node| -node.slots.size } total_slots = sources.inject(0) do |sum, node| sum + node.all_slots.size end sources.each do |node| # Proportional number of slots based on node size node_slots = (num_slots.to_f / total_slots * node.all_slots.size) node.all_slots.take(node_slots).each do |slot| node.move_slot!(slot, target) end end end |
#size ⇒ Object
48 49 50 |
# File 'lib/ruster/cluster.rb', line 48 def size info[:cluster_size].to_i end |
#slots_assigned ⇒ Object
28 29 30 |
# File 'lib/ruster/cluster.rb', line 28 def slots_assigned info[:cluster_slots_assigned].to_i end |
#slots_fail ⇒ Object
40 41 42 |
# File 'lib/ruster/cluster.rb', line 40 def slots_fail info[:cluster_slots_fail].to_i end |
#slots_ok ⇒ Object
32 33 34 |
# File 'lib/ruster/cluster.rb', line 32 def slots_ok info[:cluster_slots_ok].to_i end |
#slots_pfail ⇒ Object
36 37 38 |
# File 'lib/ruster/cluster.rb', line 36 def slots_pfail info[:cluster_slots_pfail].to_i end |
#state ⇒ Object
16 17 18 |
# File 'lib/ruster/cluster.rb', line 16 def state info[:cluster_state] end |
#stats_messages_received ⇒ Object
60 61 62 |
# File 'lib/ruster/cluster.rb', line 60 def info[:stats_messages_received].to_i end |
#stats_messages_sent ⇒ Object
56 57 58 |
# File 'lib/ruster/cluster.rb', line 56 def info[:cluster_stats_messages_sent].to_i end |