Class: Karafka::Connection::Listener
- Inherits:
-
Object
- Object
- Karafka::Connection::Listener
- Includes:
- Helpers::Async
- Defined in:
- lib/karafka/connection/listener.rb
Overview
A single listener that listens to incoming messages from a single subscription group. It polls the messages and then enqueues jobs. It also takes care of potential recovery from critical errors by restarting everything in a safe manner.
This is the heart of the consumption process.
It provides async API for managing, so all status changes are expected to be async.
Instance Attribute Summary collapse
-
#id ⇒ String
readonly
Can be useful for logging.
-
#subscription_group ⇒ Karafka::Routing::SubscriptionGroup
readonly
Subscription group that this listener handles.
Instance Method Summary collapse
-
#active? ⇒ Boolean
Is this listener active (not stopped and not pending).
-
#call ⇒ Object
Runs the main listener fetch loop.
-
#initialize(subscription_group, jobs_queue, scheduler) ⇒ Karafka::Connection::Listener
constructor
Listener instance.
-
#shutdown ⇒ Object
Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.
-
#start! ⇒ Object
We overwrite the state ‘#start` because on start we need to also start running listener in the async thread.
Methods included from Helpers::Async
#alive?, #async_call, included
Constructor Details
#initialize(subscription_group, jobs_queue, scheduler) ⇒ Karafka::Connection::Listener
Returns listener instance.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/karafka/connection/listener.rb', line 32 def initialize(subscription_group, jobs_queue, scheduler) proc_config = ::Karafka::App.config.internal.processing @id = SecureRandom.hex(6) @subscription_group = subscription_group @jobs_queue = jobs_queue @coordinators = Processing::CoordinatorsBuffer.new(subscription_group.topics) @client = Client.new(@subscription_group, -> { running? }) @executors = Processing::ExecutorsBuffer.new(@client, subscription_group) @jobs_builder = proc_config.jobs_builder @partitioner = proc_config.partitioner_class.new(subscription_group) @scheduler = scheduler @events_poller = Helpers::IntervalRunner.new { @client.events_poll } # We keep one buffer for messages to preserve memory and not allocate extra objects # We can do this that way because we always first schedule jobs using messages before we # fetch another batch. @messages_buffer = MessagesBuffer.new(subscription_group) @mutex = Mutex.new @status = Status.new @jobs_queue.register(@subscription_group.id) # This makes sure that even if we tick more often than the interval time due to frequent # unlocks from short-lived jobs or async queues synchronization, events handling and jobs # scheduling still happens with the expected frequency @interval_runner = Helpers::IntervalRunner.new do @events_poller.call @scheduler.on_manage end end |
Instance Attribute Details
#id ⇒ String (readonly)
Can be useful for logging
17 18 19 |
# File 'lib/karafka/connection/listener.rb', line 17 def id @id end |
#subscription_group ⇒ Karafka::Routing::SubscriptionGroup (readonly)
Returns subscription group that this listener handles.
20 21 22 |
# File 'lib/karafka/connection/listener.rb', line 20 def subscription_group @subscription_group end |
Instance Method Details
#active? ⇒ Boolean
Returns is this listener active (not stopped and not pending).
99 100 101 |
# File 'lib/karafka/connection/listener.rb', line 99 def active? @status.active? end |
#call ⇒ Object
Prefetch callbacks can be used to seek offset or do other things before we actually start consuming data
Runs the main listener fetch loop.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/karafka/connection/listener.rb', line 67 def call Karafka.monitor.instrument( 'connection.listener.before_fetch_loop', caller: self, client: @client, subscription_group: @subscription_group ) fetch_loop Karafka.monitor.instrument( 'connection.listener.after_fetch_loop', caller: self, client: @client, subscription_group: @subscription_group ) end |
#shutdown ⇒ Object
This method is not private despite being part of the fetch loop because in case of a forceful shutdown, it may be invoked from a separate thread
We wrap it with a mutex exactly because of the above case of forceful shutdown
Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/karafka/connection/listener.rb', line 124 def shutdown @mutex.synchronize do return if stopped? # Nothing to clear if it was not even running return stopped! if pending? @executors.clear @coordinators.reset @client.stop stopped! end end |
#start! ⇒ Object
We overwrite the state ‘#start` because on start we need to also start running listener in the async thread. While other state transitions happen automatically and status state change is enough, here we need to run the background threads
106 107 108 109 110 111 112 113 114 115 |
# File 'lib/karafka/connection/listener.rb', line 106 def start! if stopped? @client.reset @status.reset! end @status.start! async_call("karafka.listener##{@subscription_group.id}") end |