Module: Karafka::Processing::InlineInsights::Consumer

Defined in:
lib/karafka/processing/inline_insights/consumer.rb

Overview

Module that adds extra methods to the consumer that allow us to fetch the insights

Instance Method Summary collapse

Instance Method Details

#insightsHash Also known as: statistics, inline_insights

Note:

We cache insights on the consumer, as in some scenarios we may no longer have them inside the Tracker, for example under involuntary revocation, incoming statistics may no longer have lost partition insights. Since we want to be consistent during single batch operations, we want to ensure, that if we have insights they are available throughout the whole processing.

Returns empty hash or hash with given partition insights if already present.

Returns:

  • (Hash)

    empty hash or hash with given partition insights if already present



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/karafka/processing/inline_insights/consumer.rb', line 19

def insights
  insights = Tracker.find(topic, partition)

  # If we no longer have new insights but we still have them locally, we can use them
  return @insights if @insights && insights.empty?
  # If insights are still the same, we can use them
  return @insights if @insights.equal?(insights)

  # If we've received new insights that are not empty, we can cache them
  @insights = insights
end

#insights?Boolean Also known as: statistics?, inline_insights?

Returns true if there are insights to work with, otherwise false.

Returns:

  • (Boolean)

    true if there are insights to work with, otherwise false



32
33
34
# File 'lib/karafka/processing/inline_insights/consumer.rb', line 32

def insights?
  !insights.empty?
end