Class: Karafka::Connection::RebalanceManager

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/rebalance_manager.rb

Overview

Note:

Since this does not happen really often, we try to stick with same objects for the empty states most of the time, so we don’t create many objects during the manager life

Note:

Internally in the rebalance manager we have a notion of lost partitions. Partitions that are lost, are those that got revoked but did not get re-assigned back. We do not expose this concept outside and we normalize to have them revoked, as it is irrelevant from the rest of the code perspective as only those that are lost are truly revoked.

Note:

For cooperative-sticky ‘#assigned_partitions` holds only the recently assigned partitions, not all the partitions that are owned

Manager for tracking changes in the partitions assignment.

We need tracking of those to clean up consumers that will no longer process given partitions as they were taken away.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeRebalanceManager



29
30
31
32
33
34
# File 'lib/karafka/connection/rebalance_manager.rb', line 29

def initialize
  @assigned_partitions = {}
  @revoked_partitions = {}
  @changed = false
  @active = false
end

Instance Attribute Details

#assigned_partitionsObject (readonly)

Returns the value of attribute assigned_partitions.



24
25
26
# File 'lib/karafka/connection/rebalance_manager.rb', line 24

def assigned_partitions
  @assigned_partitions
end

#revoked_partitionsObject (readonly)

Returns the value of attribute revoked_partitions.



24
25
26
# File 'lib/karafka/connection/rebalance_manager.rb', line 24

def revoked_partitions
  @revoked_partitions
end

Instance Method Details

#active?Boolean

Note:

This method is needed to make sure that when using cooperative-sticky, we do not close until first rebalance. Otherwise librdkafka may crash.

Returns true if there was at least one rebalance.

Returns:

  • (Boolean)

    true if there was at least one rebalance

See Also:



54
55
56
# File 'lib/karafka/connection/rebalance_manager.rb', line 54

def active?
  @active
end

#changed?Boolean

Returns indicates a state change in the partitions assignment.

Returns:

  • (Boolean)

    indicates a state change in the partitions assignment



46
47
48
# File 'lib/karafka/connection/rebalance_manager.rb', line 46

def changed?
  @changed
end

#clearObject

Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed



39
40
41
42
43
# File 'lib/karafka/connection/rebalance_manager.rb', line 39

def clear
  @assigned_partitions.clear
  @revoked_partitions.clear
  @changed = false
end

#lost_partitionsObject

We consider as lost only partitions that were taken away and not re-assigned back to us



79
80
81
82
83
84
85
86
87
# File 'lib/karafka/connection/rebalance_manager.rb', line 79

def lost_partitions
  lost_partitions = {}

  revoked_partitions.each do |topic, partitions|
    lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY)
  end

  lost_partitions
end

#on_partitions_assigned(partitions) ⇒ Object

Callback that kicks in inside of rdkafka, when new partitions are assigned.

Parameters:

  • partitions (Rdkafka::Consumer::TopicPartitionList)


62
63
64
65
66
# File 'lib/karafka/connection/rebalance_manager.rb', line 62

def on_partitions_assigned(partitions)
  @active = true
  @assigned_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) }
  @changed = true
end

#on_partitions_revoked(partitions) ⇒ Object

Callback that kicks in inside of rdkafka, when partitions are revoked.

Parameters:

  • partitions (Rdkafka::Consumer::TopicPartitionList)


72
73
74
75
76
# File 'lib/karafka/connection/rebalance_manager.rb', line 72

def on_partitions_revoked(partitions)
  @active = true
  @revoked_partitions = partitions.to_h.transform_values { |part| part.map(&:partition) }
  @changed = true
end