Class: Karafka::Pro::Swarm::LivenessListener
- Inherits:
-
Swarm::LivenessListener
- Object
- Swarm::LivenessListener
- Karafka::Pro::Swarm::LivenessListener
- Defined in:
- lib/karafka/pro/swarm/liveness_listener.rb
Overview
This listener should not break anything if subscribed in the supervisor prior to forking as it relies on server events for operations.
Pro listener that monitors RSS usage and other heartbeat metrics (if configured) to ensure that everything operates.
It can:
- monitor poll frequency to make sure things are not polled not often enough
- monitor consumption to make sure we do not process data for too long
- monitor RSS to make sure that we do not use too much memory
By default it does not monitor memory and consuming and polling is configured in such a way to align with ‘max.poll.interval.ms` and other defaults.
Failure statuses reported are as follows:
- 1 - polling ttl exceeded
- 2 - consuming ttl exceeded
- 3 - memory limit exceeded
Instance Method Summary collapse
-
#initialize(memory_limit: Float::INFINITY, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000) ⇒ LivenessListener
constructor
A new instance of LivenessListener.
-
#on_connection_listener_fetch_loop(_event) ⇒ Object
Tick on each fetch.
-
#on_connection_listener_stopped(_event) ⇒ Object
Deregister the polling tracker for given listener.
-
#on_connection_listener_stopping(_event) ⇒ Object
Deregister the polling tracker for given listener.
- #on_error_occurred(_event) ⇒ Object
-
#on_statistics_emitted(_event) ⇒ Object
Reports the current status once in a while.
Constructor Details
#initialize(memory_limit: Float::INFINITY, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000) ⇒ LivenessListener
The default TTL matches the default ‘max.poll.interval.ms`
Returns a new instance of LivenessListener.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 44 def initialize( memory_limit: Float::INFINITY, consuming_ttl: 5 * 60 * 1_000, polling_ttl: 5 * 60 * 1_000 ) @polling_ttl = polling_ttl @consuming_ttl = consuming_ttl # We cast it just in case someone would provide '10MB' or something similar @memory_limit = memory_limit.is_a?(String) ? memory_limit.to_i : memory_limit @pollings = {} @consumptions = {} super() end |
Instance Method Details
#on_connection_listener_fetch_loop(_event) ⇒ Object
Tick on each fetch
62 63 64 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 62 def on_connection_listener_fetch_loop(_event) mark_polling_tick end |
#on_connection_listener_stopped(_event) ⇒ Object
Deregister the polling tracker for given listener
120 121 122 123 124 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 120 def on_connection_listener_stopped(_event) return if Karafka::App.done? clear_polling_tick end |
#on_connection_listener_stopping(_event) ⇒ Object
Deregister the polling tracker for given listener
108 109 110 111 112 113 114 115 116 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 108 def on_connection_listener_stopping(_event) # We are interested in disabling tracking for given listener only if it was requested # when karafka was running. If we would always clear, it would not catch the shutdown # polling requirements. The "running" listener shutdown operations happen only when # the manager requests it for downscaling. return if Karafka::App.done? clear_polling_tick end |
#on_error_occurred(_event) ⇒ Object
88 89 90 91 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 88 def on_error_occurred(_event) clear_consumption_tick clear_polling_tick end |
#on_statistics_emitted(_event) ⇒ Object
Reports the current status once in a while
96 97 98 99 100 101 102 103 104 |
# File 'lib/karafka/pro/swarm/liveness_listener.rb', line 96 def on_statistics_emitted(_event) periodically do return unless node current_status = status current_status.positive? ? node.unhealthy(current_status) : node.healthy end end |