Class: Karafka::Connection::Manager

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/connection/manager.rb

Overview

Connections manager responsible for starting and managing listeners connections

In the OSS version it starts listeners as they are without any connection management or resources utilization supervision and shuts them down or quiets when time has come

Direct Known Subclasses

Pro::Connection::Manager

Instance Method Summary collapse

Constructor Details

#initializeManager

Returns a new instance of Manager.



11
12
13
# File 'lib/karafka/connection/manager.rb', line 11

def initialize
  @once_executions = Set.new
end

Instance Method Details

#controlObject

Note:

It is important to ensure, that all listeners from the same consumer group are always all quiet before we can fully shutdown given consumer group. Skipping this can cause ‘Timed out LeaveGroupRequest in flight` and other errors. For the simplification, we just quiet all and only then move forward.

Note:

This manager works with the assumption, that all listeners are executed on register.

Controls the state of listeners upon shutdown and quiet requests In both cases (quieting and shutdown) we first need to stop processing more work and tell listeners to become quiet (connected but not yielding messages) and then depending on whether we want to stop fully or just keep quiet we apply different flow.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/karafka/connection/manager.rb', line 39

def control
  # Do nothing until shutdown or quiet
  return unless Karafka::App.done?

  # When we are done processing, immediately quiet all the listeners so they do not pick up
  # new work to do
  once(:quiet!) { @listeners.each(&:quiet!) }

  return unless @listeners.all?(&:quiet?)

  # If we are in the process of moving to quiet state, we need to check it.
  # Switch to quieted status only when all listeners are fully quieted and do nothing after
  # that until further state changes
  once(:quieted!) { Karafka::App.quieted! } if Karafka::App.quieting?

  return if Karafka::App.quiet?

  once(:stop!) { @listeners.each(&:stop!) }
end

#done?Boolean

Returns true if all listeners are stopped.

Returns:

  • (Boolean)

    true if all listeners are stopped



24
25
26
# File 'lib/karafka/connection/manager.rb', line 24

def done?
  @listeners.all?(&:stopped?)
end

#register(listeners) ⇒ Object

Registers provided listeners and starts all of them

Parameters:



18
19
20
21
# File 'lib/karafka/connection/manager.rb', line 18

def register(listeners)
  @listeners = listeners
  @listeners.each(&:start!)
end