Class: Karafka::Connection::ConsumerGroupCoordinator

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/consumer_group_coordinator.rb

Overview

This object represents a collective status of execution of group of listeners running inside of one consumer group but in separate subscription groups.

There are cases when we do not want to close a given client when others from the same consumer group are running because it can cause instabilities due to early shutdown of some of the clients out of same consumer group.

We also want to make sure, we close one consumer at a time while others can continue polling.

This prevents a scenario, where a rebalance is not acknowledged and we loose assignment without having a chance to commit changes.

Instance Method Summary collapse

Constructor Details

#initialize(group_size) ⇒ ConsumerGroupCoordinator

Returns a new instance of ConsumerGroupCoordinator.

Parameters:

  • group_size (Integer)

    number of separate subscription groups in a consumer group



18
19
20
21
22
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 18

def initialize(group_size)
  @shutdown_lock = Mutex.new
  @group_size = group_size
  @finished = Set.new
end

Instance Method Details

#finish_work(listener_id) ⇒ Object

Marks given listener as finished

Parameters:

  • listener_id (String)


43
44
45
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 43

def finish_work(listener_id)
  @finished << listener_id
end

#finished?Boolean

Returns true if all the subscription groups from a given consumer group are finished.

Returns:

  • (Boolean)

    true if all the subscription groups from a given consumer group are finished



26
27
28
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 26

def finished?
  @finished.size == @group_size
end

#shutdown?Boolean

Note:

If true, will also obtain a lock so no-one else will be closing the same time we do

Returns can we start shutdown on a given listener.

Returns:

  • (Boolean)

    can we start shutdown on a given listener



32
33
34
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 32

def shutdown?
  finished? && @shutdown_lock.try_lock
end

#unlockObject

Unlocks the shutdown lock



37
38
39
# File 'lib/karafka/connection/consumer_group_coordinator.rb', line 37

def unlock
  @shutdown_lock.unlock if @shutdown_lock.owned?
end