Class: Karafka::Connection::Listener
- Inherits:
-
Object
- Object
- Karafka::Connection::Listener
- Includes:
- Helpers::Async
- Defined in:
- lib/karafka/connection/listener.rb
Overview
A single listener that listens to incoming messages from a single subscription group. It polls the messages and then enqueues jobs. It also takes care of potential recovery from critical errors by restarting everything in a safe manner.
This is the heart of the consumption process.
Instance Attribute Summary collapse
-
#id ⇒ String
readonly
Can be useful for logging.
Instance Method Summary collapse
-
#call ⇒ Object
Runs the main listener fetch loop.
-
#initialize(consumer_group_coordinator, subscription_group, jobs_queue) ⇒ Karafka::Connection::Listener
constructor
Listener instance.
-
#shutdown ⇒ Object
Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.
Methods included from Helpers::Async
Constructor Details
#initialize(consumer_group_coordinator, subscription_group, jobs_queue) ⇒ Karafka::Connection::Listener
Returns listener instance.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/karafka/connection/listener.rb', line 21 def initialize(consumer_group_coordinator, subscription_group, jobs_queue) proc_config = ::Karafka::App.config.internal.processing @id = SecureRandom.hex(6) @consumer_group_coordinator = consumer_group_coordinator @subscription_group = subscription_group @jobs_queue = jobs_queue @coordinators = Processing::CoordinatorsBuffer.new(subscription_group.topics) @client = Client.new(@subscription_group) @executors = Processing::ExecutorsBuffer.new(@client, subscription_group) @jobs_builder = proc_config.jobs_builder @partitioner = proc_config.partitioner_class.new(subscription_group) # We reference scheduler here as it is much faster than fetching this each time @scheduler = proc_config.scheduler # We keep one buffer for messages to preserve memory and not allocate extra objects # We can do this that way because we always first schedule jobs using messages before we # fetch another batch. @messages_buffer = MessagesBuffer.new(subscription_group) @mutex = Mutex.new @stopped = false end |
Instance Attribute Details
#id ⇒ String (readonly)
Can be useful for logging
15 16 17 |
# File 'lib/karafka/connection/listener.rb', line 15 def id @id end |
Instance Method Details
#call ⇒ Object
Prefetch callbacks can be used to seek offset or do other things before we actually start consuming data
Runs the main listener fetch loop.
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/karafka/connection/listener.rb', line 47 def call Karafka.monitor.instrument( 'connection.listener.before_fetch_loop', caller: self, client: @client, subscription_group: @subscription_group ) fetch_loop end |
#shutdown ⇒ Object
This method is not private despite being part of the fetch loop because in case of a forceful shutdown, it may be invoked from a separate thread
We wrap it with a mutex exactly because of the above case of forceful shutdown
Stops the jobs queue, triggers shutdown on all the executors (sync), commits offsets and stops kafka client.
65 66 67 68 69 70 71 72 73 74 |
# File 'lib/karafka/connection/listener.rb', line 65 def shutdown return if @stopped @mutex.synchronize do @stopped = true @executors.clear @coordinators.reset @client.stop end end |