Class: Karafka::Pro::Processing::VirtualOffsetManager
- Inherits:
-
Object
- Object
- Karafka::Pro::Processing::VirtualOffsetManager
- Defined in:
- lib/karafka/pro/processing/virtual_offset_manager.rb
Overview
We still use the regular coordinator “real” offset management as we want to have them as separated as possible because the real seek offset management is also used for pausing, filtering and others and should not be impacted by the virtual one
This manager is not thread-safe by itself. It should operate from coordinator locked locations.
Manager that keeps track of our offsets with the virtualization layer that are local to given partition assignment. It allows for easier offset management for virtual virtual partition cases as it provides us ability to mark as consumed and move the real offset behind as expected.
Instance Attribute Summary collapse
-
#groups ⇒ Object
readonly
Returns the value of attribute groups.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the manager for a next collective operation.
-
#initialize(topic, partition) ⇒ VirtualOffsetManager
constructor
A new instance of VirtualOffsetManager.
-
#mark(message) ⇒ Object
Marks given message as marked (virtually consumed).
-
#mark_until(message) ⇒ Object
Mark all from all groups including the ‘message`.
-
#markable ⇒ Messages::Seek
Markable message for real offset marking.
-
#markable? ⇒ Boolean
Is there a real offset we can mark as consumed.
-
#marked ⇒ Array<Integer>
Offsets of messages already marked as consumed virtually.
-
#register(offsets_group) ⇒ Object
Registers an offset group coming from one virtual consumer.
Constructor Details
#initialize(topic, partition) ⇒ VirtualOffsetManager
We need topic and partition because we use a seek message (virtual) for real offset management. We could keep real message reference but this can be memory consuming and not worth it.
Returns a new instance of VirtualOffsetManager.
37 38 39 40 41 42 43 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 37 def initialize(topic, partition) @topic = topic @partition = partition @groups = [] @marked = {} @real_offset = -1 end |
Instance Attribute Details
#groups ⇒ Object (readonly)
Returns the value of attribute groups.
29 30 31 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 29 def groups @groups end |
Instance Method Details
#clear ⇒ Object
Clears the manager for a next collective operation
46 47 48 49 50 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 46 def clear @groups.clear @marked = {} @real_offset = -1 end |
#mark(message) ⇒ Object
Marks given message as marked (virtually consumed). We mark given message offset and other earlier offsets from the same group as done and we can refresh our real offset representation based on that as it might have changed to a newer real offset.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 68 def mark() offset = .offset group = @groups.find { |reg_group| reg_group.include?(offset) } # This case can happen when someone uses MoM and wants to mark message from a previous # batch as consumed. We can add it, since the real offset refresh will point to it unless group group = [offset] @groups << group end position = group.index(offset) # Mark all previous messages from the same group also as virtually consumed group[0..position].each do |markable_offset| @marked[markable_offset] = true end # Recompute the real offset representation materialize_real_offset end |
#mark_until(message) ⇒ Object
Mark all from all groups including the ‘message`. Useful when operating in a collapsed state for marking
94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 94 def mark_until() mark() @groups.each do |group| group.each do |offset| next if offset > .offset @marked[offset] = true end end materialize_real_offset end |
#markable ⇒ Messages::Seek
Returns markable message for real offset marking.
120 121 122 123 124 125 126 127 128 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 120 def markable raise Errors::InvalidRealOffsetUsageError unless markable? Messages::Seek.new( @topic, @partition, @real_offset ) end |
#markable? ⇒ Boolean
Is there a real offset we can mark as consumed
115 116 117 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 115 def markable? !@real_offset.negative? end |
#marked ⇒ Array<Integer>
Returns Offsets of messages already marked as consumed virtually.
109 110 111 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 109 def marked @marked.select { |_, status| status }.map(&:first).sort end |
#register(offsets_group) ⇒ Object
Registers an offset group coming from one virtual consumer. In order to move the real underlying offset accordingly, we need to make sure to track the virtual consumers offsets groups independently and only materialize the end result.
57 58 59 60 61 |
# File 'lib/karafka/pro/processing/virtual_offset_manager.rb', line 57 def register(offsets_group) @groups << offsets_group offsets_group.each { |offset| @marked[offset] = false } end |