Class: Karafka::Pro::Processing::Partitioner
- Inherits:
-
Karafka::Processing::Partitioner
- Object
- Karafka::Processing::Partitioner
- Karafka::Pro::Processing::Partitioner
- Defined in:
- lib/karafka/pro/processing/partitioner.rb
Overview
Pro partitioner that can distribute work based on the virtual partitioner settings
Instance Method Summary collapse
Methods inherited from Karafka::Processing::Partitioner
Constructor Details
This class inherits a constructor from Karafka::Processing::Partitioner
Instance Method Details
#call(topic, messages) {|group, karafka| ... } ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/karafka/pro/processing/partitioner.rb', line 23 def call(topic, ) ktopic = @subscription_group.topics.find(topic) # We only partition work if we have a virtual partitioner and more than one thread to # process the data. With one thread it is not worth partitioning the work as the work # itself will be assigned to one thread (pointless work) if ktopic.virtual_partitions? && ktopic.virtual_partitions.max_partitions > 1 # We need to reduce it to the max concurrency, so the group_id is not a direct effect # of the end user action. Otherwise the persistence layer for consumers would cache # it forever and it would cause memory leaks # # This also needs to be consistent because the aggregation here needs to warrant, that # the same partitioned message will always be assigned to the same virtual partition. # Otherwise in case of a window aggregation with VP spanning across several polls, the # data could not be complete. groupings = .group_by do |msg| key = ktopic.virtual_partitions.partitioner.call(msg).to_s.sum key % ktopic.virtual_partitions.max_partitions end groupings.each do |key, | yield(key, ) end else # When no virtual partitioner, works as regular one yield(0, ) end end |