Module: Karafka::Pro::Routing::Features::AdaptiveIterator::Topic

Defined in:
lib/karafka/pro/routing/features/adaptive_iterator/topic.rb

Overview

Topic extension allowing us to enable and configure adaptive iterator

Instance Method Summary collapse

Instance Method Details

#adaptive_iterator(active: false, safety_margin: 10, marking_method: :mark_as_consumed, clean_after_yielding: true) ⇒ Object

Parameters:

  • active (Boolean) (defaults to: false)

    should we use the automatic adaptive iterator

  • safety_margin (Integer) (defaults to: 10)

    How big of a margin we leave ourselves so we can safely communicate back with Kafka, etc. We stop and seek back when we’ve burned 85% of the time by default. We leave 15% of time for post-processing operations so we have space before we hit max.poll.interval.ms.

  • marking_method (Symbol) (defaults to: :mark_as_consumed)

    If we should, how should we mark

  • clean_after_yielding (Boolean) (defaults to: true)

    Should we clean post-yielding via the cleaner API



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 30

def adaptive_iterator(
  active: false,
  safety_margin: 10,
  marking_method: :mark_as_consumed,
  clean_after_yielding: true
)
  @adaptive_iterator ||= Config.new(
    active: active,
    safety_margin: safety_margin,
    marking_method: marking_method,
    clean_after_yielding: clean_after_yielding
  )
end

#adaptive_iterator?Boolean

Returns Is adaptive iterator active. It is always ‘true`, since we use it via explicit messages batch wrapper.

Returns:

  • (Boolean)

    Is adaptive iterator active. It is always ‘true`, since we use it via explicit messages batch wrapper



46
47
48
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 46

def adaptive_iterator?
  adaptive_iterator.active?
end

#to_hHash

Returns topic with all its native configuration options plus poll guarding setup configuration.

Returns:

  • (Hash)

    topic with all its native configuration options plus poll guarding setup configuration.



52
53
54
55
56
# File 'lib/karafka/pro/routing/features/adaptive_iterator/topic.rb', line 52

def to_h
  super.merge(
    adaptive_iterator: adaptive_iterator.to_h
  ).freeze
end