Class: Karafka::Pro::Processing::Partitioner

Inherits:
Karafka::Processing::Partitioner show all
Defined in:
lib/karafka/pro/processing/partitioner.rb

Overview

Pro partitioner that can distribute work based on the virtual partitioner settings

Instance Method Summary collapse

Methods inherited from Karafka::Processing::Partitioner

#initialize

Constructor Details

This class inherits a constructor from Karafka::Processing::Partitioner

Instance Method Details

#call(topic, messages) {|group, karafka| ... } ⇒ Object

Parameters:

Yield Parameters:



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/karafka/pro/processing/partitioner.rb', line 23

def call(topic, messages)
  ktopic = @subscription_group.topics.find(topic)

  # We only partition work if we have a virtual partitioner and more than one thread to
  # process the data. With one thread it is not worth partitioning the work as the work
  # itself will be assigned to one thread (pointless work)
  if ktopic.virtual_partitions? && ktopic.virtual_partitions.max_partitions > 1
    # We need to reduce it to the max concurrency, so the group_id is not a direct effect
    # of the end user action. Otherwise the persistence layer for consumers would cache
    # it forever and it would cause memory leaks
    #
    # This also needs to be consistent because the aggregation here needs to warrant, that
    # the same partitioned message will always be assigned to the same virtual partition.
    # Otherwise in case of a window aggregation with VP spanning across several polls, the
    # data could not be complete.
    groupings = messages.group_by do |msg|
      key = ktopic.virtual_partitions.partitioner.call(msg).to_s.sum

      key % ktopic.virtual_partitions.max_partitions
    end

    groupings.each do |key, messages_group|
      yield(key, messages_group)
    end
  else
    # When no virtual partitioner, works as regular one
    yield(0, messages)
  end
end