Class: Karafka::Processing::InlineInsights::Tracker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Core::Helpers::Time, Singleton
Defined in:
lib/karafka/processing/inline_insights/tracker.rb

Overview

Note:

We include cache of 5 minutes for revoked partitions to compensate for cases where when using LRJ a lost partition data would not be present anymore, however we would still be in the processing phase. Since those metrics are published with each ‘poll`, regular processing is not a subject of this issue. For LRJ we keep the reference. The only case where this could be switched midway is when LRJ is running for an extended period of time after the involuntary revocation. Having a time based cache instead of tracking simplifies the design as we do not have to deal with state tracking, especially since we would have to track also operations running in a revoked state.

Note:

This tracker keeps in memory data about all topics and partitions that it encounters because in case of routing patterns, we may start getting statistics prior to registering given topic via dynamic routing expansions. In such case we would not have insights where they were actually available for us to use.

Note:

Memory usage is negligible as long as we can evict expired data. Single metrics set for a single partition contains around 4KB of data. This means, that in case of an assignment of 1000 partitions, we use around 4MB of space for tracking those metrics.

Object used to track statistics coming from librdkafka in a way that can be accessible by the consumers

We use a single tracker because we do not need state management here as our consumer groups clients identified by statistics name value are unique. On top of that, having a per process one that is a singleton allows us to use tracker easily also from other places like filtering API etc.

Instance Method Summary collapse

Constructor Details

#initializeTracker

Returns a new instance of Tracker.



54
55
56
57
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 54

def initialize
  @accu = {}
  @mutex = Mutex.new
end

Instance Method Details

#add(consumer_group_id, statistics) ⇒ Object

Adds each partition statistics into internal accumulator. Single statistics set may contain data from multiple topics and their partitions because a single client can operate on multiple topics and partitions.

We iterate over those topics and partitions and store topics partitions data only.

Parameters:

  • consumer_group_id (String)

    id of the consumer group for which statistics were emitted.

  • statistics (Hash)

    librdkafka enriched statistics



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 68

def add(consumer_group_id, statistics)
  @mutex.synchronize do
    statistics.fetch('topics', EMPTY_HASH).each do |topic_name, t_details|
      t_details.fetch('partitions', EMPTY_HASH).each do |partition_id, p_details|
        next unless track?(partition_id, p_details)

        key = "#{consumer_group_id}_#{topic_name}_#{partition_id}"
        @accu[key] = [monotonic_now, p_details]
      end
    end

    evict
  end
end

#clearObject

Clears the tracker



98
99
100
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 98

def clear
  @mutex.synchronize { @accu.clear }
end

#find(topic, partition) ⇒ Hash

Note:

We do not enclose it with a mutex mainly because the only thing that could happen here that would be a race-condition is a miss that anyhow we need to support due to how librdkafka ships metrics and a potential removal of data on heavily revoked LRJ.

Finds statistics about requested consumer group topic partition

Parameters:

Returns:

  • (Hash)

    hash with given topic partition statistics or empty hash if not present



92
93
94
95
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 92

def find(topic, partition)
  key = "#{topic.consumer_group.id}_#{topic.name}_#{partition}"
  @accu.fetch(key, EMPTY_ARRAY).last || EMPTY_HASH
end