Class: Karafka::Pro::Processing::Coordinators::VirtualOffsetManager
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::Coordinators::VirtualOffsetManager
- Defined in:
- lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb
Overview
We still use the regular coordinator “real” offset management as we want to have them as separated as possible because the real seek offset management is also used for pausing, filtering and others and should not be impacted by the virtual one
This manager is not thread-safe by itself. It should operate from coordinator locked locations.
Manager that keeps track of our offsets with the virtualization layer that are local to given partition assignment. It allows for easier offset management for virtual virtual partition cases as it provides us ability to mark as consumed and move the real offset behind as expected.
Instance Attribute Summary collapse
-
#groups ⇒ Object
readonly
Returns the value of attribute groups.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the manager for a next collective operation.
-
#initialize(topic, partition, offset_metadata_strategy) ⇒ VirtualOffsetManager
constructor
A new instance of VirtualOffsetManager.
-
#mark(message, offset_metadata) ⇒ Object
Marks given message as marked (virtually consumed).
-
#mark_until(message, offset_metadata) ⇒ Object
Mark all from all groups including the ‘message`.
-
#markable ⇒ Array<Messages::Seek, String>
Markable message for real offset marking and its associated metadata.
-
#markable? ⇒ Boolean
Is there a real offset we can mark as consumed.
-
#marked ⇒ Array<Integer>
Offsets of messages already marked as consumed virtually.
-
#register(offsets_group) ⇒ Object
Registers an offset group coming from one virtual consumer.
Constructor Details
#initialize(topic, partition, offset_metadata_strategy) ⇒ VirtualOffsetManager
We need topic and partition because we use a seek message (virtual) for real offset management. We could keep real message reference but this can be memory consuming and not worth it.
Returns a new instance of VirtualOffsetManager.
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 41 def initialize(topic, partition, ) @topic = topic @partition = partition @groups = [] @marked = {} @offsets_metadata = {} @real_offset = -1 @offset_metadata_strategy = @current_offset_metadata = nil end |
Instance Attribute Details
#groups ⇒ Object (readonly)
Returns the value of attribute groups.
30 31 32 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 30 def groups @groups end |
Instance Method Details
#clear ⇒ Object
Clears the manager for a next collective operation
53 54 55 56 57 58 59 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 53 def clear @groups.clear @offsets_metadata.clear @current_offset_metadata = nil @marked.clear @real_offset = -1 end |
#mark(message, offset_metadata) ⇒ Object
Marks given message as marked (virtually consumed). We mark given message offset and other earlier offsets from the same group as done and we can refresh our real offset representation based on that as it might have changed to a newer real offset.
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 78 def mark(, ) offset = .offset # Store metadata when we materialize the most stable offset @offsets_metadata[offset] = @current_offset_metadata = group = @groups.find { |reg_group| reg_group.include?(offset) } # This case can happen when someone uses MoM and wants to mark message from a previous # batch as consumed. We can add it, since the real offset refresh will point to it unless group group = [offset] @groups << group end position = group.index(offset) # Mark all previous messages from the same group also as virtually consumed group[0..position].each do |markable_offset| # Set previous messages metadata offset as the offset of higher one for overwrites # unless a different metadata were set explicitely @offsets_metadata[markable_offset] ||= @marked[markable_offset] = true end # Recompute the real offset representation materialize_real_offset end |
#mark_until(message, offset_metadata) ⇒ Object
Mark all from all groups including the ‘message`. Useful when operating in a collapsed state for marking
112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 112 def mark_until(, ) mark(, ) @groups.each do |group| group.each do |offset| next if offset > .offset @offsets_metadata[offset] = @marked[offset] = true end end materialize_real_offset end |
#markable ⇒ Array<Messages::Seek, String>
Returns markable message for real offset marking and its associated metadata.
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 140 def markable raise Errors::InvalidRealOffsetUsageError unless markable? = case @offset_metadata_strategy when :exact @offsets_metadata.fetch(@real_offset) when :current @current_offset_metadata else raise Errors::UnsupportedCaseError, @offset_metadata_strategy end [ Messages::Seek.new( @topic, @partition, @real_offset ), ] end |
#markable? ⇒ Boolean
Is there a real offset we can mark as consumed
134 135 136 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 134 def markable? !@real_offset.negative? end |
#marked ⇒ Array<Integer>
Returns Offsets of messages already marked as consumed virtually.
128 129 130 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 128 def marked @marked.select { |_, status| status }.map(&:first).sort end |
#register(offsets_group) ⇒ Object
Registers an offset group coming from one virtual consumer. In order to move the real underlying offset accordingly, we need to make sure to track the virtual consumers offsets groups independently and only materialize the end result.
66 67 68 69 70 |
# File 'lib/karafka/pro/processing/coordinators/virtual_offset_manager.rb', line 66 def register(offsets_group) @groups << offsets_group offsets_group.each { |offset| @marked[offset] = false } end |