Class: Karafka::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/server.rb

Overview

Karafka consuming server class

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.jobs_queueObject

Jobs queue



24
25
26
# File 'lib/karafka/server.rb', line 24

def jobs_queue
  @jobs_queue
end

.listenersObject

Set of consuming threads. Each consumer thread contains a single consumer



18
19
20
# File 'lib/karafka/server.rb', line 18

def listeners
  @listeners
end

.workersObject

Set of workers



21
22
23
# File 'lib/karafka/server.rb', line 21

def workers
  @workers
end

Class Method Details

.quietObject

Quiets the Karafka server.

Karafka will stop processing but won’t quit the consumer group, so no rebalance will be triggered until final shutdown.



136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/karafka/server.rb', line 136

def quiet
  # We don't have to safe-guard it with check states as the state transitions work only
  # in one direction
  Karafka::App.quiet!

  # We need one more thread to monitor the process and move to quieted once everything
  # is quiet and no processing is happening anymore
  Thread.new do
    sleep(0.1) until listeners.coordinators.all?(&:finished?)
    Karafka::App.quieted!
  end
end

.runObject

Method which runs app



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/karafka/server.rb', line 27

def run
  self.listeners = []
  self.workers = []

  # We need to validate this prior to running because it may be executed also from the
  # embedded
  # We cannot validate this during the start because config needs to be populated and routes
  # need to be defined.
  config.internal.cli.contract.validate!(
    config.internal.routing.activity_manager.to_h
  )

  process.on_sigint { stop }
  process.on_sigquit { stop }
  process.on_sigterm { stop }
  process.on_sigtstp { quiet }
  process.supervise

  # Start is blocking until stop is called and when we stop, it will wait until
  # all of the things are ready to stop
  start

  # We always need to wait for Karafka to stop here since we should wait for the stop running
  # in a separate thread (or trap context) to indicate everything is closed
  # Since `#start` is blocking, we will get here only after the runner is done. This will
  # not add any performance degradation because of that.
  sleep(0.1) until Karafka::App.terminated?
# Try its best to shutdown underlying components before re-raising
# rubocop:disable Lint/RescueException
rescue Exception => e
  # rubocop:enable Lint/RescueException
  stop

  raise e
end

.startObject

Note:

We don’t need to sleep because Karafka::Fetcher is locking and waiting to

Starts Karafka with a supervision finish loop (and it won’t happen until we explicitly want to stop)



66
67
68
69
# File 'lib/karafka/server.rb', line 66

def start
  Karafka::App.run!
  Karafka::Runner.new.call
end

.stopObject

Note:

This method is not async. It should not be executed from the workers as it will lock them forever. If you need to run Karafka shutdown from within workers threads, please start a separate thread to do so.

Stops Karafka with a supervision (as long as there is a shutdown timeout) If consumers or workers won’t stop in a given time frame, it will force them to exit



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/karafka/server.rb', line 77

def stop
  # Initialize the stopping process only if Karafka was running
  return if Karafka::App.stopping?
  return if Karafka::App.stopped?
  return if Karafka::App.terminated?

  Karafka::App.stop!

  timeout = config.shutdown_timeout

  # We check from time to time (for the timeout period) if all the threads finished
  # their work and if so, we can just return and normal shutdown process will take place
  # We divide it by 1000 because we use time in ms.
  ((timeout / 1_000) * SUPERVISION_CHECK_FACTOR).to_i.times do
    return if listeners.count(&:alive?).zero? && workers.count(&:alive?).zero?

    sleep SUPERVISION_SLEEP
  end

  raise Errors::ForcefulShutdownError
rescue Errors::ForcefulShutdownError => e
  Karafka.monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    type: 'app.stopping.error'
  )

  # We're done waiting, lets kill them!
  workers.each(&:terminate)
  listeners.each(&:terminate)
  # We always need to shutdown clients to make sure we do not force the GC to close consumer.
  # This can cause memory leaks and crashes.
  listeners.each(&:shutdown)

  # We also do not forcefully terminate everything when running in the embedded mode,
  # otherwise we would overwrite the shutdown process of the process that started Karafka
  return unless process.supervised?

  # exit! is not within the instrumentation as it would not trigger due to exit
  Kernel.exit!(FORCEFUL_EXIT_CODE)
ensure
  # We need to check if it wasn't an early exit to make sure that only on stop invocation
  # can change the status after everything is closed
  if timeout
    Karafka::App.stopped!

    # We close producer as the last thing as it can be used in the notification pipeline
    # to dispatch state changes, etc
    Karafka::App.producer.close

    Karafka::App.terminate!
  end
end