Module: Karafka::Pro::Processing::AdaptiveIterator::Consumer
- Defined in:
- lib/karafka/pro/processing/adaptive_iterator/consumer.rb
Overview
Consumer enhancements needed to wrap the batch iterator for adaptive iterating It automatically marks as consumed, ensures that we do not reach ‘max.poll.interval.ms` and does other stuff to simplify user per-message processing
Instance Method Summary collapse
Instance Method Details
#each(*args) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/pro/processing/adaptive_iterator/consumer.rb', line 24 def each(*args) adi_config = topic.adaptive_iterator tracker = Tracker.new( adi_config.safety_margin, coordinator.last_polled_at, topic.subscription_group.kafka.fetch(:'max.poll.interval.ms') ) .each(*args) do || # Always stop if we've lost the assignment return if revoked? # No automatic marking risk when mom is enabled so we can fast stop return if Karafka::App.done? && topic.manual_offset_management? # Seek request on done will allow us to stop without marking the offset when user had # the automatic offset marking. This should not be a big network traffic issue for # the end user as we're stopping anyhow but should improve shutdown time if tracker.enough? || Karafka::App.done? # Enough means we no longer have time to process more data without polling as we # risk reaching max poll interval. Instead we seek and we will poll again soon. seek(.offset, reset_offset: true) return end tracker.track { yield() } # Clean if this is what user configured .clean! if adi_config.clean_after_yielding? public_send(adi_config.marking_method, ) end end |