Class: Kafkat::ClusterRestart::ClusterRestartHelper
- Inherits:
-
Object
- Object
- Kafkat::ClusterRestart::ClusterRestartHelper
- Defined in:
- lib/kafkat/command/cluster_restart.rb
Class Method Summary collapse
- .calculate_cost(broker_id, partitions, session) ⇒ Object
- .get_broker_to_leader_partition_mapping(topics) ⇒ Object
- .select_broker_with_min_cost(session, topics) ⇒ Object
Class Method Details
.calculate_cost(broker_id, partitions, session) ⇒ Object
211 212 213 214 215 216 217 218 219 |
# File 'lib/kafkat/command/cluster_restart.rb', line 211 def self.calculate_cost(broker_id, partitions, session) raise UnknownBrokerError, "Unknown broker #{broker_id}" unless session.broker_states.key?(broker_id) partitions.find_all { |partition| partition.leader == broker_id } .reduce(0) do |cost, partition| cost += partition.replicas.length cost -= partition.replicas.find_all { |replica| session.restarted?(replica) }.size cost end end |
.get_broker_to_leader_partition_mapping(topics) ⇒ Object
201 202 203 204 205 206 207 208 209 |
# File 'lib/kafkat/command/cluster_restart.rb', line 201 def self.get_broker_to_leader_partition_mapping(topics) broker_to_partitions = Hash.new { |h, key| h[key] = [] } topics.values.flat_map { |topic| topic.partitions } .each do |partition| broker_to_partitions[partition.leader] << partition end broker_to_partitions end |
.select_broker_with_min_cost(session, topics) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/kafkat/command/cluster_restart.rb', line 187 def self.select_broker_with_min_cost(session, topics) broker_to_partition = get_broker_to_leader_partition_mapping(topics) broker_restart_cost = Hash.new(0) session.broker_states.each do |broker_id, state| if state == Session::STATE_NOT_RESTARTED current_cost = calculate_cost(broker_id, broker_to_partition[broker_id], session) broker_restart_cost[broker_id] = current_cost if current_cost != nil end end # Sort by cost first, and then broker_id broker_restart_cost.min_by { |broker_id, cost| [cost, broker_id] } end |