Class: Kafka::Partitioner
- Inherits:
-
Object
- Object
- Kafka::Partitioner
- Defined in:
- lib/kafka/partitioner.rb
Overview
Assigns partitions to messages.
Instance Method Summary collapse
-
#call(partition_count, message) ⇒ Integer
Assigns a partition number based on a partition key.
-
#initialize(hash_function: nil) ⇒ Partitioner
constructor
A new instance of Partitioner.
Constructor Details
#initialize(hash_function: nil) ⇒ Partitioner
Returns a new instance of Partitioner.
11 12 13 |
# File 'lib/kafka/partitioner.rb', line 11 def initialize(hash_function: nil) @digest = Digest.find_digest(hash_function || :crc32) end |
Instance Method Details
#call(partition_count, message) ⇒ Integer
Assigns a partition number based on a partition key. If no explicit partition key is provided, the message key will be used instead.
If the key is nil, then a random partition is selected. Otherwise, a digest of the key is used to deterministically find a partition. As long as the number of partitions doesn't change, the same key will always be assigned to the same partition.
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/kafka/partitioner.rb', line 27 def call(partition_count, ) raise ArgumentError if partition_count == 0 # If no explicit partition key is specified we use the message key instead. key = .partition_key || .key if key.nil? rand(partition_count) else @digest.hash(key) % partition_count end end |