Class: Karafka::Pro::Iterator::Expander

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/iterator/expander.rb

Overview

There are various ways you can provide topics information for iterating.

This mapper normalizes this data, resolves offsets and maps the time based offsets into appropriate once

Following formats are accepted:

  • ‘topic1’ - just a string with one topic name

  • ‘topic1’, ‘topic2’
    • just the names

  • { ‘topic1’ => -100 } - names with negative lookup offset

  • { ‘topic1’ => { 0 => 5 } } - names with exact partitions offsets

  • { ‘topic1’ => { 0 => -5 }, ‘topic2’ => { 1 => 5 } } - with per partition negative offsets

  • { ‘topic1’ => 100 } - means we run all partitions from the offset 100

  • { ‘topic1’ => Time.now - 60 } - we run all partitions from the message from 60s ago

  • { ‘topic1’ => { 1 => Time.now - 60 } } - partition1 from message 60s ago

  • { ‘topic1’ => { 1 => true } } - will pick first offset not consumed on this CG for p 1

  • { ‘topic1’ => true } - will pick first offset not consumed on this CG for all p

Instance Method Summary collapse

Instance Method Details

#call(topics) ⇒ Hash

Expands topics to which we want to subscribe with partitions information in case this info is not provided.

Parameters:

  • topics (Array, Hash, String)

    topics definitions

Returns:

  • (Hash)

    expanded and normalized requested topics and partitions data



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/karafka/pro/iterator/expander.rb', line 40

def call(topics)
  expanded = Hash.new { |h, k| h[k] = {} }

  normalize_format(topics).map do |topic, details|
    if details.is_a?(Hash)
      details.each do |partition, offset|
        expanded[topic][partition] = offset
      end
    else
      partition_count(topic).times do |partition|
        # If no offsets are provided, we just start from zero
        expanded[topic][partition] = details || 0
      end
    end
  end

  expanded
end