Module: Karafka::Pro::Routing::Features::DirectAssignments::Topic

Defined in:
lib/karafka/pro/routing/features/direct_assignments/topic.rb

Overview

Topic extensions for direct assignments

Instance Method Summary collapse

Instance Method Details

#direct_assignments(*partitions_or_all) ⇒ Object Also known as: assign

Allows for direct assignment of

Examples:

Assign all available partitions

direct_assignments(true)

Assign partitions 2, 3 and 5

direct_assignments(2, 3, 5)

Assign partitions from 0 to 3

direct_assignments(0..3)

Parameters:

  • partitions_or_all (true, Array<Integer>)

    informs Karafka that we want to use direct assignments instead of automatic for this topic. It also allows us to specify which partitions we’re interested in or ‘true` if in all



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/karafka/pro/routing/features/direct_assignments/topic.rb', line 34

def direct_assignments(*partitions_or_all)
  @direct_assignments ||= if partitions_or_all == [true]
    Config.new(
      active: true,
      partitions: true
    )
  elsif partitions_or_all.size == 1 && partitions_or_all.first.is_a?(Range)
    partitions_or_all = partitions_or_all.first.to_a

    Config.new(
      active: true,
      partitions: partitions_or_all.map { |partition| [partition, true] }.to_h
    )
  else
    Config.new(
      active: !partitions_or_all.empty?,
      partitions: partitions_or_all.map { |partition| [partition, true] }.to_h
    )
  end
end

#to_hHash

Returns topic with all its native configuration options plus direct assignments.

Returns:

  • (Hash)

    topic with all its native configuration options plus direct assignments



59
60
61
62
63
# File 'lib/karafka/pro/routing/features/direct_assignments/topic.rb', line 59

def to_h
  super.merge(
    direct_assignments: direct_assignments.to_h
  ).freeze
end