Module: Karafka::Pro::Routing::Features::VirtualPartitions::Topic

Defined in:
lib/karafka/pro/routing/features/virtual_partitions/topic.rb

Overview

Topic extensions to be able to manage virtual partitions feature

Instance Method Summary collapse

Instance Method Details

#to_hHash

Returns topic with all its native configuration options plus manual offset management namespace settings.

Returns:

  • (Hash)

    topic with all its native configuration options plus manual offset management namespace settings



57
58
59
60
61
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 57

def to_h
  super.merge(
    virtual_partitions: virtual_partitions.to_h
  ).freeze
end

#virtual_partitions(max_partitions: Karafka::App.config.concurrency, partitioner: nil, offset_metadata_strategy: :current, reducer: nil) ⇒ VirtualPartitions

Returns method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it.

Parameters:

  • max_partitions (Integer) (defaults to: Karafka::App.config.concurrency)

    max number of virtual partitions that can come out of the single distribution flow. When set to more than the Karafka threading, will create more work than workers. When less, can ensure we have spare resources to process other things in parallel.

  • partitioner (nil, #call) (defaults to: nil)

    nil or callable partitioner

  • offset_metadata_strategy (Symbol) (defaults to: :current)

    how we should match the metadata for the offset. ‘:exact` will match the offset matching metadata and `:current` will select the most recently reported metadata

  • reducer (nil, #call) (defaults to: nil)

    reducer for VPs key. It allows for using a custom reducer to achieve enhanced parallelization when the default reducer is not enough.

Returns:

  • (VirtualPartitions)

    method that allows to set the virtual partitions details during the routing configuration and then allows to retrieve it



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 33

def virtual_partitions(
  max_partitions: Karafka::App.config.concurrency,
  partitioner: nil,
  offset_metadata_strategy: :current,
  reducer: nil
)
  @virtual_partitions ||= Config.new(
    active: !partitioner.nil?,
    max_partitions: max_partitions,
    partitioner: partitioner,
    offset_metadata_strategy: ,
    # If no reducer provided, we use this one. It just runs a modulo on the sum of
    # a stringified version, providing fairly good distribution.
    reducer: reducer || ->(virtual_key) { virtual_key.to_s.sum % max_partitions }
  )
end

#virtual_partitions?Boolean

Returns are virtual partitions enabled for given topic.

Returns:

  • (Boolean)

    are virtual partitions enabled for given topic



51
52
53
# File 'lib/karafka/pro/routing/features/virtual_partitions/topic.rb', line 51

def virtual_partitions?
  virtual_partitions.active?
end