Class: Karafka::Connection::RebalanceManager

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/rebalance_manager.rb

Overview

Note:

Since this does not happen really often, we try to stick with same objects for the empty states most of the time, so we don’t create many objects during the manager life

Note:

Internally in the rebalance manager we have a notion of lost partitions. Partitions that are lost, are those that got revoked but did not get re-assigned back. We do not expose this concept outside and we normalize to have them revoked, as it is irrelevant from the rest of the code perspective as only those that are lost are truly revoked.

Note:

For cooperative-sticky ‘#assigned_partitions` holds only the recently assigned partitions, not all the partitions that are owned

Manager for tracking changes in the partitions assignment.

We need tracking of those to clean up consumers that will no longer process given partitions as they were taken away.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRebalanceManager



29
30
31
32
33
# File 'lib/karafka/connection/rebalance_manager.rb', line 29

def initialize
  @assigned_partitions = {}
  @revoked_partitions = {}
  @changed = false
end

Instance Attribute Details

#assigned_partitionsObject (readonly)

Returns the value of attribute assigned_partitions.



24
25
26
# File 'lib/karafka/connection/rebalance_manager.rb', line 24

def assigned_partitions
  @assigned_partitions
end

#revoked_partitionsObject (readonly)

Returns the value of attribute revoked_partitions.



24
25
26
# File 'lib/karafka/connection/rebalance_manager.rb', line 24

def revoked_partitions
  @revoked_partitions
end

Instance Method Details

#changed?Boolean

Returns indicates a state change in the partitions assignment.

Returns:

  • (Boolean)

    indicates a state change in the partitions assignment



45
46
47
# File 'lib/karafka/connection/rebalance_manager.rb', line 45

def changed?
  @changed
end

#clearObject

Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed



38
39
40
41
42
# File 'lib/karafka/connection/rebalance_manager.rb', line 38

def clear
  @assigned_partitions.clear
  @revoked_partitions.clear
  @changed = false
end

#lost_partitionsObject

We consider as lost only partitions that were taken away and not re-assigned back to us



70
71
72
73
74
75
76
77
78
# File 'lib/karafka/connection/rebalance_manager.rb', line 70

def lost_partitions
  lost_partitions = {}

  revoked_partitions.each do |topic, partitions|
    lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY)
  end

  lost_partitions
end

#on_partitions_assigned(_, partitions) ⇒ Object

Callback that kicks in inside of rdkafka, when new partitions are assigned.

Parameters:

  • _ (Rdkafka::Consumer)
  • partitions (Rdkafka::Consumer::TopicPartitionList)


54
55
56
57
# File 'lib/karafka/connection/rebalance_manager.rb', line 54

def on_partitions_assigned(_, partitions)
  @assigned_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) }
  @changed = true
end

#on_partitions_revoked(_, partitions) ⇒ Object

Callback that kicks in inside of rdkafka, when partitions are revoked.

Parameters:

  • _ (Rdkafka::Consumer)
  • partitions (Rdkafka::Consumer::TopicPartitionList)


64
65
66
67
# File 'lib/karafka/connection/rebalance_manager.rb', line 64

def on_partitions_revoked(_, partitions)
  @revoked_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) }
  @changed = true
end