Class: Karafka::Pro::Processing::VirtualPartitions::Distributors::Balanced
- Inherits:
-
Base
- Object
- Base
- Karafka::Pro::Processing::VirtualPartitions::Distributors::Balanced
- Defined in:
- lib/karafka/pro/processing/virtual_partitions/distributors/balanced.rb
Overview
Balanced distributor that groups messages by partition key and processes larger groups first while maintaining message order within groups
Instance Method Summary collapse
-
#call(messages) ⇒ Hash{Integer => Array<Karafka::Messages::Message>}
Distributes messages to virtual partitions ensuring balanced load across workers by grouping messages by partition key and assigning larger groups first.
Methods inherited from Base
Constructor Details
This class inherits a constructor from Karafka::Pro::Processing::VirtualPartitions::Distributors::Base
Instance Method Details
#call(messages) ⇒ Hash{Integer => Array<Karafka::Messages::Message>}
Distributes messages to virtual partitions ensuring balanced load across workers by grouping messages by partition key and assigning larger groups first
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/karafka/pro/processing/virtual_partitions/distributors/balanced.rb', line 36 def call() # Group messages by partition key key_groupings = .group_by { |msg| config.partitioner.call(msg) } worker_loads = Array.new(config.max_partitions, 0) worker_assignments = Array.new(config.max_partitions) { [] } # Sort keys by workload in descending order sorted_keys = key_groupings.keys.sort_by { |key| -key_groupings[key].size } # Assign each key to the worker with the least current load sorted_keys.each do |key| # Find worker with minimum current load min_load_worker = worker_loads.each_with_index.min_by { |load, _| load }[1] = key_groupings[key] # Assign this key to that worker worker_assignments[min_load_worker] += worker_loads[min_load_worker] += .size end # Combine messages for each worker and sort by offset worker_assignments .each_with_index .reject { |, _| .empty? } .map! { |, index| [index, .sort_by!(&:offset)] } .to_h end |