Class: Karafka::Connection::RebalanceManager
- Inherits:
-
Object
- Object
- Karafka::Connection::RebalanceManager
- Defined in:
- lib/karafka/connection/rebalance_manager.rb
Overview
Since this does not happen really often, we try to stick with same objects for the empty states most of the time, so we don’t create many objects during the manager life
Internally in the rebalance manager we have a notion of lost partitions. Partitions that are lost, are those that got revoked but did not get re-assigned back. We do not expose this concept outside and we normalize to have them revoked, as it is irrelevant from the rest of the code perspective as only those that are lost are truly revoked.
For cooperative-sticky ‘#assigned_partitions` holds only the recently assigned partitions, not all the partitions that are owned
Manager for tracking changes in the partitions assignment.
We need tracking of those to clean up consumers that will no longer process given partitions as they were taken away.
Instance Attribute Summary collapse
-
#assigned_partitions ⇒ Object
readonly
Returns the value of attribute assigned_partitions.
-
#revoked_partitions ⇒ Object
readonly
Returns the value of attribute revoked_partitions.
Instance Method Summary collapse
-
#changed? ⇒ Boolean
Indicates a state change in the partitions assignment.
-
#clear ⇒ Object
Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed.
- #initialize ⇒ RebalanceManager constructor
-
#lost_partitions ⇒ Object
We consider as lost only partitions that were taken away and not re-assigned back to us.
-
#on_partitions_assigned(_, partitions) ⇒ Object
Callback that kicks in inside of rdkafka, when new partitions are assigned.
-
#on_partitions_revoked(_, partitions) ⇒ Object
Callback that kicks in inside of rdkafka, when partitions are revoked.
Constructor Details
#initialize ⇒ RebalanceManager
29 30 31 32 33 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 29 def initialize @assigned_partitions = {} @revoked_partitions = {} @changed = false end |
Instance Attribute Details
#assigned_partitions ⇒ Object (readonly)
Returns the value of attribute assigned_partitions.
24 25 26 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 24 def assigned_partitions @assigned_partitions end |
#revoked_partitions ⇒ Object (readonly)
Returns the value of attribute revoked_partitions.
24 25 26 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 24 def revoked_partitions @revoked_partitions end |
Instance Method Details
#changed? ⇒ Boolean
Returns indicates a state change in the partitions assignment.
45 46 47 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 45 def changed? @changed end |
#clear ⇒ Object
Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed
38 39 40 41 42 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 38 def clear @assigned_partitions.clear @revoked_partitions.clear @changed = false end |
#lost_partitions ⇒ Object
We consider as lost only partitions that were taken away and not re-assigned back to us
70 71 72 73 74 75 76 77 78 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 70 def lost_partitions lost_partitions = {} revoked_partitions.each do |topic, partitions| lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY) end lost_partitions end |
#on_partitions_assigned(_, partitions) ⇒ Object
Callback that kicks in inside of rdkafka, when new partitions are assigned.
54 55 56 57 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 54 def on_partitions_assigned(_, partitions) @assigned_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) } @changed = true end |
#on_partitions_revoked(_, partitions) ⇒ Object
Callback that kicks in inside of rdkafka, when partitions are revoked.
64 65 66 67 |
# File 'lib/karafka/connection/rebalance_manager.rb', line 64 def on_partitions_revoked(_, partitions) @revoked_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) } @changed = true end |