Class: Karafka::Instrumentation::Callbacks::Rebalance

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/instrumentation/callbacks/rebalance.rb

Overview

Callback that connects to the librdkafka rebalance callback and converts those events into our internal events

Instance Method Summary collapse

Constructor Details

#initialize(subscription_group) ⇒ Rebalance

Returns a new instance of Rebalance.

Parameters:

  • subscription_group (Karafka::Routes::SubscriptionGroup)

    subscription group for which we want to manage rebalances



15
16
17
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 15

def initialize(subscription_group)
  @subscription_group = subscription_group
end

Instance Method Details

#on_partitions_assign(tpl) ⇒ Object

Publishes an event that partitions are going to be assigned

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)


30
31
32
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 30

def on_partitions_assign(tpl)
  instrument('partitions_assign', tpl)
end

#on_partitions_assigned(tpl) ⇒ Object

Publishes an event that partitions were assigned.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)


45
46
47
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 45

def on_partitions_assigned(tpl)
  instrument('partitions_assigned', tpl)
end

#on_partitions_revoke(tpl) ⇒ Object

Publishes an event that partitions are going to be revoked. At this stage we can still commit offsets, etc.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)


23
24
25
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 23

def on_partitions_revoke(tpl)
  instrument('partitions_revoke', tpl)
end

#on_partitions_revoked(tpl) ⇒ Object

Publishes an event that partitions were revoked. This is after we’ve lost them, so no option to commit offsets.

Parameters:

  • tpl (Rdkafka::Consumer::TopicPartitionList)


38
39
40
# File 'lib/karafka/instrumentation/callbacks/rebalance.rb', line 38

def on_partitions_revoked(tpl)
  instrument('partitions_revoked', tpl)
end