Class: Karafka::Pro::Processing::Partitioner

Inherits:
Karafka::Processing::Partitioner show all
Defined in:
lib/karafka/pro/processing/partitioner.rb

Overview

Pro partitioner that can distribute work based on the virtual partitioner settings

Instance Method Summary collapse

Methods inherited from Karafka::Processing::Partitioner

#initialize

Constructor Details

This class inherits a constructor from Karafka::Processing::Partitioner

Instance Method Details

#call(topic, messages, coordinator) {|group, karafka| ... } ⇒ Object

Parameters:

Yield Parameters:



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/karafka/pro/processing/partitioner.rb', line 25

def call(topic, messages, coordinator)
  ktopic = @subscription_group.topics.find(topic)

  # We only partition work if we have:
  # - a virtual partitioner
  # - more than one thread to process the data
  # - collective is not collapsed via coordinator
  #
  # With one thread it is not worth partitioning the work as the work itself will be
  # assigned to one thread (pointless work)
  #
  # We collapse the partitioning on errors because we "regain" full ordering on a batch
  # that potentially contains the data that caused the error.
  #
  # This is great because it allows us to run things without the parallelization that adds
  # a bit of uncertainty and allows us to use DLQ and safely skip messages if needed.
  if ktopic.virtual_partitions? &&
     ktopic.virtual_partitions.max_partitions > 1 &&
     !coordinator.collapsed?
    # We need to reduce it to the max concurrency, so the group_id is not a direct effect
    # of the end user action. Otherwise the persistence layer for consumers would cache
    # it forever and it would cause memory leaks
    #
    # This also needs to be consistent because the aggregation here needs to warrant, that
    # the same partitioned message will always be assigned to the same virtual partition.
    # Otherwise in case of a window aggregation with VP spanning across several polls, the
    # data could not be complete.
    groupings = messages.group_by do |msg|
      key = ktopic.virtual_partitions.partitioner.call(msg).to_s.sum

      key % ktopic.virtual_partitions.max_partitions
    end

    groupings.each do |key, messages_group|
      yield(key, messages_group)
    end
  else
    # When no virtual partitioner, works as regular one
    yield(0, messages)
  end
end