Class: Karafka::Pro::Iterator::Expander
- Inherits:
-
Object
- Object
- Karafka::Pro::Iterator::Expander
- Defined in:
- lib/karafka/pro/iterator/expander.rb
Overview
There are various ways you can provide topics information for iterating.
This mapper normalizes this data, resolves offsets and maps the time based offsets into appropriate once
Following formats are accepted:
-
‘topic1’ - just a string with one topic name
- ‘topic1’, ‘topic2’
-
just the names
-
-
{ ‘topic1’ => -100 } - names with negative lookup offset
-
{ ‘topic1’ => { 0 => 5 } } - names with exact partitions offsets
-
{ ‘topic1’ => { 0 => -5 }, ‘topic2’ => { 1 => 5 } } - with per partition negative offsets
-
{ ‘topic1’ => 100 } - means we run all partitions from the offset 100
-
{ ‘topic1’ => Time.now - 60 } - we run all partitions from the message from 60s ago
-
{ ‘topic1’ => { 1 => Time.now - 60 } } - partition1 from message 60s ago
-
{ ‘topic1’ => { 1 => true } } - will pick first offset not consumed on this CG for p 1
-
{ ‘topic1’ => true } - will pick first offset not consumed on this CG for all p
Instance Method Summary collapse
-
#call(topics) ⇒ Hash
Expands topics to which we want to subscribe with partitions information in case this info is not provided.
Instance Method Details
#call(topics) ⇒ Hash
Expands topics to which we want to subscribe with partitions information in case this info is not provided.
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/karafka/pro/iterator/expander.rb', line 40 def call(topics) = Hash.new { |h, k| h[k] = {} } normalize_format(topics).map do |topic, details| if details.is_a?(Hash) details.each do |partition, offset| [topic][partition] = offset end else partition_count(topic).times do |partition| # If no offsets are provided, we just start from zero [topic][partition] = details || 0 end end end end |