Class: Karafka::Connection::RebalanceManager

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/rebalance_manager.rb

Overview

Note:

Since this does not happen really often, we try to stick with same objects for the empty states most of the time, so we don’t create many objects during the manager life

Note:

Internally in the rebalance manager we have a notion of lost partitions. Partitions that are lost, are those that got revoked but did not get re-assigned back. We do not expose this concept outside and we normalize to have them revoked, as it is irrelevant from the rest of the code perspective as only those that are lost are truly revoked.

Note:

For cooperative-sticky ‘#assigned_partitions` holds only the recently assigned partitions, not all the partitions that are owned

Note:

We have to have the ‘subscription_group` reference because we have a global pipeline for notifications and we need to make sure we track changes only for things that are of relevance to our subscription group

Manager for tracking changes in the partitions assignment after the assignment is done.

We need tracking of those to clean up consumers that will no longer process given partitions as they were taken away.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group_id) ⇒ RebalanceManager

Parameters:

  • subscription_group_id (String)

    subscription group id



34
35
36
37
38
39
40
41
42
43
# File 'lib/karafka/connection/rebalance_manager.rb', line 34

def initialize(subscription_group_id)
  @assigned_partitions = {}
  @revoked_partitions = {}
  @changed = false
  @active = false
  @subscription_group_id = subscription_group_id

  # Connects itself to the instrumentation pipeline so rebalances can be tracked
  ::Karafka.monitor.subscribe(self)
end

Instance Attribute Details

#assigned_partitionsObject (readonly)

Returns the value of attribute assigned_partitions.



28
29
30
# File 'lib/karafka/connection/rebalance_manager.rb', line 28

def assigned_partitions
  @assigned_partitions
end

#revoked_partitionsObject (readonly)

Returns the value of attribute revoked_partitions.



28
29
30
# File 'lib/karafka/connection/rebalance_manager.rb', line 28

def revoked_partitions
  @revoked_partitions
end

Instance Method Details

#active?Boolean

Note:

This method is needed to make sure that when using cooperative-sticky, we do not close until first rebalance. Otherwise librdkafka may crash.

Returns true if there was at least one rebalance.

Returns:

  • (Boolean)

    true if there was at least one rebalance

See Also:



63
64
65
# File 'lib/karafka/connection/rebalance_manager.rb', line 63

def active?
  @active
end

#changed?Boolean

Returns indicates a state change in the partitions assignment.

Returns:

  • (Boolean)

    indicates a state change in the partitions assignment



55
56
57
# File 'lib/karafka/connection/rebalance_manager.rb', line 55

def changed?
  @changed
end

#clearObject

Resets the rebalance manager state This needs to be done before each polling loop as during the polling, the state may be changed



48
49
50
51
52
# File 'lib/karafka/connection/rebalance_manager.rb', line 48

def clear
  @assigned_partitions.clear
  @revoked_partitions.clear
  @changed = false
end

#lost_partitionsObject

We consider as lost only partitions that were taken away and not re-assigned back to us



68
69
70
71
72
73
74
75
76
# File 'lib/karafka/connection/rebalance_manager.rb', line 68

def lost_partitions
  lost_partitions = {}

  revoked_partitions.each do |topic, partitions|
    lost_partitions[topic] = partitions - assigned_partitions.fetch(topic, EMPTY_ARRAY)
  end

  lost_partitions
end

#on_rebalance_partitions_assigned(event) ⇒ Object

Callback that kicks in inside of rdkafka, when new partitions were assigned.

Parameters:

  • event (Karafka::Core::Monitoring::Event)


82
83
84
85
86
87
88
89
# File 'lib/karafka/connection/rebalance_manager.rb', line 82

def on_rebalance_partitions_assigned(event)
  # Apply changes only for our subscription group
  return unless event[:subscription_group_id] == @subscription_group_id

  @active = true
  @assigned_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
  @changed = true
end

#on_rebalance_partitions_revoked(event) ⇒ Object

Callback that kicks in inside of rdkafka, when partitions were revoked.

Parameters:

  • event (Karafka::Core::Monitoring::Event)


95
96
97
98
99
100
101
102
# File 'lib/karafka/connection/rebalance_manager.rb', line 95

def on_rebalance_partitions_revoked(event)
  # Apply changes only for our subscription group
  return unless event[:subscription_group_id] == @subscription_group_id

  @active = true
  @revoked_partitions = event[:tpl].to_h.transform_values { |part| part.map(&:partition) }
  @changed = true
end