Class: Karafka::Connection::Manager
- Inherits:
-
Object
- Object
- Karafka::Connection::Manager
- Defined in:
- lib/karafka/connection/manager.rb
Overview
Connections manager responsible for starting and managing listeners connections
In the OSS version it starts listeners as they are without any connection management or resources utilization supervision and shuts them down or quiets when time has come
Direct Known Subclasses
Instance Method Summary collapse
-
#control ⇒ Object
Controls the state of listeners upon shutdown and quiet requests In both cases (quieting and shutdown) we first need to stop processing more work and tell listeners to become quiet (connected but not yielding messages) and then depending on whether we want to stop fully or just keep quiet we apply different flow.
-
#done? ⇒ Boolean
True if all listeners are stopped.
-
#initialize ⇒ Manager
constructor
A new instance of Manager.
-
#register(listeners) ⇒ Object
Registers provided listeners and starts all of them.
Constructor Details
#initialize ⇒ Manager
Returns a new instance of Manager.
11 12 13 |
# File 'lib/karafka/connection/manager.rb', line 11 def initialize @once_executions = Set.new end |
Instance Method Details
#control ⇒ Object
It is important to ensure, that all listeners from the same consumer group are always all quiet before we can fully shutdown given consumer group. Skipping this can cause ‘Timed out LeaveGroupRequest in flight` and other errors. For the simplification, we just quiet all and only then move forward.
This manager works with the assumption, that all listeners are executed on register.
Controls the state of listeners upon shutdown and quiet requests In both cases (quieting and shutdown) we first need to stop processing more work and tell listeners to become quiet (connected but not yielding messages) and then depending on whether we want to stop fully or just keep quiet we apply different flow.
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/connection/manager.rb', line 39 def control # Do nothing until shutdown or quiet return unless Karafka::App.done? # When we are done processing, immediately quiet all the listeners so they do not pick up # new work to do once(:quiet!) { @listeners.each(&:quiet!) } return unless @listeners.all?(&:quiet?) # If we are in the process of moving to quiet state, we need to check it. # Switch to quieted status only when all listeners are fully quieted and do nothing after # that until further state changes once(:quieted!) { Karafka::App.quieted! } if Karafka::App.quieting? return if Karafka::App.quiet? once(:stop!) { @listeners.each(&:stop!) } end |
#done? ⇒ Boolean
Returns true if all listeners are stopped.
24 25 26 |
# File 'lib/karafka/connection/manager.rb', line 24 def done? @listeners.all?(&:stopped?) end |
#register(listeners) ⇒ Object
Registers provided listeners and starts all of them
18 19 20 21 |
# File 'lib/karafka/connection/manager.rb', line 18 def register(listeners) @listeners = listeners @listeners.each(&:start!) end |