Module: Karafka::Pro::BaseConsumer

Defined in:
lib/karafka/pro/base_consumer.rb

Overview

Extra methods always used in the base consumer in the pro mode

We do not define those methods as part of the strategies flows, because they are injected (strategies) on singletons and often used only in one of the strategy variants

Methods here are suppose to be always available or are expected to be redefined

Instance Method Summary collapse

Instance Method Details

#errors_trackerKarafka::Pro::Processing::Coordinators::ErrorsTracker

Note:

This will always contain only details of errors that occurred during ‘#consume` because only those are retryable.

Note:

This may contain more than one error because:

  • this can collect various errors that might have happened during virtual partitions execution

  • errors can pile up during retries and until a clean run, they will be collected with a limit of last 100. We do not store more because a consumer with an endless error loop would cause memory leaks without such a limit.

Returns tracker for errors that occurred during processing until another successful processing.

Returns:



35
36
37
# File 'lib/karafka/pro/base_consumer.rb', line 35

def errors_tracker
  coordinator.errors_tracker
end

#subscription_groups_coordinatorKarafka::Pro::Processing::SubscriptionGroupsCoordinator

Note:

Since this stops polling, it can cause reaching ‘max.poll.interval.ms` limitations.

Note:

This is a low-level API used for cross-topic coordination and some advanced features. Use it at own risk.

Returns Coordinator allowing to pause and resume polling of the given subscription group jobs queue for postponing further work.

Returns:



47
48
49
# File 'lib/karafka/pro/base_consumer.rb', line 47

def subscription_groups_coordinator
  Processing::SubscriptionGroupsCoordinator.instance
end