Class: Karafka::Processing::InlineInsights::Tracker
- Inherits:
-
Object
- Object
- Karafka::Processing::InlineInsights::Tracker
- Extended by:
- Forwardable
- Includes:
- Core::Helpers::Time, Singleton
- Defined in:
- lib/karafka/processing/inline_insights/tracker.rb
Overview
We include cache of 5 minutes for revoked partitions to compensate for cases where when using LRJ a lost partition data would not be present anymore, however we would still be in the processing phase. Since those metrics are published with each ‘poll`, regular processing is not a subject of this issue. For LRJ we keep the reference. The only case where this could be switched midway is when LRJ is running for an extended period of time after the involuntary revocation. Having a time based cache instead of tracking simplifies the design as we do not have to deal with state tracking, especially since we would have to track also operations running in a revoked state.
This tracker keeps in memory data about all topics and partitions that it encounters because in case of routing patterns, we may start getting statistics prior to registering given topic via dynamic routing expansions. In such case we would not have insights where they were actually available for us to use.
Memory usage is negligible as long as we can evict expired data. Single metrics set for a single partition contains around 4KB of data. This means, that in case of an assignment of 1000 partitions, we use around 4MB of space for tracking those metrics.
Object used to track statistics coming from librdkafka in a way that can be accessible by the consumers
We use a single tracker because we do not need state management here as our consumer groups clients identified by statistics name value are unique. On top of that, having a per process one that is a singleton allows us to use tracker easily also from other places like filtering API etc.
Instance Method Summary collapse
-
#add(consumer_group_id, statistics) ⇒ Object
Adds each partition statistics into internal accumulator.
-
#clear ⇒ Object
Clears the tracker.
-
#find(topic, partition) ⇒ Hash
Finds statistics about requested consumer group topic partition.
-
#initialize ⇒ Tracker
constructor
A new instance of Tracker.
Constructor Details
#initialize ⇒ Tracker
Returns a new instance of Tracker.
54 55 56 57 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 54 def initialize @accu = {} @mutex = Mutex.new end |
Instance Method Details
#add(consumer_group_id, statistics) ⇒ Object
Adds each partition statistics into internal accumulator. Single statistics set may contain data from multiple topics and their partitions because a single client can operate on multiple topics and partitions.
We iterate over those topics and partitions and store topics partitions data only.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 68 def add(consumer_group_id, statistics) @mutex.synchronize do statistics.fetch('topics', EMPTY_HASH).each do |topic_name, t_details| t_details.fetch('partitions', EMPTY_HASH).each do |partition_id, p_details| next unless track?(partition_id, p_details) key = "#{consumer_group_id}_#{topic_name}_#{partition_id}" @accu[key] = [monotonic_now, p_details] end end evict end end |
#clear ⇒ Object
Clears the tracker
98 99 100 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 98 def clear @mutex.synchronize { @accu.clear } end |
#find(topic, partition) ⇒ Hash
We do not enclose it with a mutex mainly because the only thing that could happen here that would be a race-condition is a miss that anyhow we need to support due to how librdkafka ships metrics and a potential removal of data on heavily revoked LRJ.
Finds statistics about requested consumer group topic partition
92 93 94 95 |
# File 'lib/karafka/processing/inline_insights/tracker.rb', line 92 def find(topic, partition) key = "#{topic.consumer_group.id}_#{topic.name}_#{partition}" @accu.fetch(key, EMPTY_ARRAY).last || EMPTY_HASH end |