Class: Karafka::Server
- Inherits:
-
Object
- Object
- Karafka::Server
- Defined in:
- lib/karafka/server.rb
Overview
Karafka consuming server class
Class Attribute Summary collapse
-
.jobs_queue ⇒ Object
Jobs queue.
-
.listeners ⇒ Object
Set of consuming threads.
-
.workers ⇒ Object
Set of workers.
Class Method Summary collapse
-
.quiet ⇒ Object
Quiets the Karafka server.
-
.run ⇒ Object
Method which runs app.
-
.start ⇒ Object
Starts Karafka with a supervision finish loop (and it won’t happen until we explicitly want to stop).
-
.stop ⇒ Object
Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers or workers won’t stop in a given time frame, it will force them to exit.
Class Attribute Details
.jobs_queue ⇒ Object
Jobs queue
24 25 26 |
# File 'lib/karafka/server.rb', line 24 def jobs_queue @jobs_queue end |
.listeners ⇒ Object
Set of consuming threads. Each consumer thread contains a single consumer
18 19 20 |
# File 'lib/karafka/server.rb', line 18 def listeners @listeners end |
.workers ⇒ Object
Set of workers
21 22 23 |
# File 'lib/karafka/server.rb', line 21 def workers @workers end |
Class Method Details
.quiet ⇒ Object
Quiets the Karafka server.
Karafka will stop processing but won’t quit the consumer group, so no rebalance will be triggered until final shutdown.
136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/karafka/server.rb', line 136 def quiet # We don't have to safe-guard it with check states as the state transitions work only # in one direction Karafka::App.quiet! # We need one more thread to monitor the process and move to quieted once everything # is quiet and no processing is happening anymore Thread.new do sleep(0.1) until listeners.coordinators.all?(&:finished?) Karafka::App.quieted! end end |
.run ⇒ Object
Method which runs app
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/karafka/server.rb', line 27 def run self.listeners = [] self.workers = [] # We need to validate this prior to running because it may be executed also from the # embedded # We cannot validate this during the start because config needs to be populated and routes # need to be defined. Contracts::ServerCliOptions.new.validate!( Karafka::App.config.internal.routing.active.to_h ) process.on_sigint { stop } process.on_sigquit { stop } process.on_sigterm { stop } process.on_sigtstp { quiet } process.supervise # Start is blocking until stop is called and when we stop, it will wait until # all of the things are ready to stop start # We always need to wait for Karafka to stop here since we should wait for the stop running # in a separate thread (or trap context) to indicate everything is closed # Since `#start` is blocking, we will get here only after the runner is done. This will # not add any performance degradation because of that. sleep(0.1) until Karafka::App.terminated? # Try its best to shutdown underlying components before re-raising # rubocop:disable Lint/RescueException rescue Exception => e # rubocop:enable Lint/RescueException stop raise e end |
.start ⇒ Object
We don’t need to sleep because Karafka::Fetcher is locking and waiting to
Starts Karafka with a supervision finish loop (and it won’t happen until we explicitly want to stop)
66 67 68 69 |
# File 'lib/karafka/server.rb', line 66 def start Karafka::App.run! Karafka::Runner.new.call end |
.stop ⇒ Object
This method is not async. It should not be executed from the workers as it will lock them forever. If you need to run Karafka shutdown from within workers threads, please start a separate thread to do so.
Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers or workers won’t stop in a given time frame, it will force them to exit
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/karafka/server.rb', line 77 def stop # Initialize the stopping process only if Karafka was running return if Karafka::App.stopping? return if Karafka::App.stopped? return if Karafka::App.terminated? Karafka::App.stop! timeout = Karafka::App.config.shutdown_timeout # We check from time to time (for the timeout period) if all the threads finished # their work and if so, we can just return and normal shutdown process will take place # We divide it by 1000 because we use time in ms. ((timeout / 1_000) * SUPERVISION_CHECK_FACTOR).to_i.times do return if listeners.count(&:alive?).zero? && workers.count(&:alive?).zero? sleep SUPERVISION_SLEEP end raise Errors::ForcefulShutdownError rescue Errors::ForcefulShutdownError => e Karafka.monitor.instrument( 'error.occurred', caller: self, error: e, type: 'app.stopping.error' ) # We're done waiting, lets kill them! workers.each(&:terminate) listeners.each(&:terminate) # We always need to shutdown clients to make sure we do not force the GC to close consumer. # This can cause memory leaks and crashes. listeners.each(&:shutdown) # We also do not forcefully terminate everything when running in the embedded mode, # otherwise we would overwrite the shutdown process of the process that started Karafka return unless process.supervised? # exit! is not within the instrumentation as it would not trigger due to exit Kernel.exit!(FORCEFUL_EXIT_CODE) ensure # We need to check if it wasn't an early exit to make sure that only on stop invocation # can change the status after everything is closed if timeout Karafka::App.stopped! # We close producer as the last thing as it can be used in the notification pipeline # to dispatch state changes, etc Karafka::App.producer.close Karafka::App.terminate! end end |