Class: Kafka::Producer
- Inherits:
-
Object
- Object
- Kafka::Producer
- Defined in:
- lib/kafka/producer.rb
Overview
Allows sending messages to a Kafka cluster.
Typically you won't instantiate this class yourself, but rather have Client do it for you, e.g.
# Will instantiate Kafka::Client
kafka = Kafka.new(["kafka1:9092", "kafka2:9092"])
# Will instantiate Kafka::Producer
producer = kafka.producer
This is done in order to share a logger as well as a pool of broker connections across
different producers. This also means that you don't need to pass the cluster
and
logger
options to #producer
. See #initialize for the list of other options
you can pass in.
Buffering
The producer buffers pending messages until #deliver_messages is called. Note that there is
a maximum buffer size (default is 1,000 messages) and writing messages after the
buffer has reached this size will result in a BufferOverflow exception. Make sure
to periodically call #deliver_messages or set max_buffer_size
to an appropriate value.
Buffering messages and sending them in batches greatly improves performance, so try to avoid sending messages after every write. The tradeoff between throughput and message delays depends on your use case.
Error Handling and Retries
The design of the error handling is based on having a MessageBuffer hold messages for all topics/partitions. Whenever we want to send messages to the cluster, we group the buffered messages by the broker they need to be sent to and fire off a request to each broker. A request can be a partial success, so we go through the response and inspect the error code for each partition that we wrote to. If the write to a given partition was successful, we clear the corresponding messages from the buffer -- otherwise, we log the error and keep the messages in the buffer.
After this, we check if the buffer is empty. If it is, we're all done. If it's
not, we do another round of requests, this time with just the remaining messages.
We do this for as long as max_retries
permits.
Compression
Depending on what kind of data you produce, enabling compression may yield improved bandwidth and space usage. Compression in Kafka is done on entire messages sets rather than on individual messages. This improves the compression rate and generally means that compressions works better the larger your buffers get, since the message sets will be larger by the time they're compressed.
Since many workloads have variations in throughput and distribution across partitions,
it's possible to configure a threshold for when to enable compression by setting
compression_threshold
. Only if the defined number of messages are buffered for a
partition will the messages be compressed.
Compression is enabled by passing the compression_codec
parameter with the
name of one of the algorithms allowed by Kafka:
:snappy
for Snappy compression.:gzip
for gzip compression.:lz4
for LZ4 compression.:zstd
for zstd compression.
By default, all message sets will be compressed if you specify a compression
codec. To increase the compression threshold, set compression_threshold
to
an integer value higher than one.
Instrumentation
Whenever #produce is called, the notification produce_message.producer.kafka
will be emitted with the following payload:
value
– the message value.key
– the message key.topic
– the topic that was produced to.buffer_size
– the buffer size after adding the message.max_buffer_size
– the maximum allowed buffer size for the producer.
After #deliver_messages completes, the notification
deliver_messages.producer.kafka
will be emitted with the following payload:
message_count
– the total number of messages that the producer tried to deliver. Note that not all messages may get delivered.delivered_message_count
– the number of messages that were successfully delivered.attempts
– the number of attempts made to deliver the messages.
Example
This is an example of an application which reads lines from stdin and writes them to Kafka:
require "kafka"
logger = Logger.new($stderr)
brokers = ENV.fetch("KAFKA_BROKERS").split(",")
# Make sure to create this topic in your Kafka cluster or configure the
# cluster to auto-create topics.
topic = "random-messages"
kafka = Kafka.new(brokers, client_id: "simple-producer", logger: logger)
producer = kafka.producer
begin
$stdin.each_with_index do |line, index|
producer.produce(line, topic: topic)
# Send messages for every 10 lines.
producer. if index % 10 == 0
end
ensure
# Make sure to send any remaining messages.
producer.
producer.shutdown
end
Defined Under Namespace
Classes: AbortTransaction
Instance Method Summary collapse
-
#abort_transaction ⇒ nil
This method abort the pending transaction, marks all the produced records aborted.
-
#begin_transaction ⇒ nil
Mark the beginning of a transaction.
- #buffer_bytesize ⇒ Object
-
#buffer_size ⇒ Integer
Returns the number of messages currently held in the buffer.
-
#clear_buffer ⇒ nil
Deletes all buffered messages.
-
#commit_transaction ⇒ nil
This method commits the pending transaction, marks all the produced records committed.
-
#deliver_messages ⇒ nil
Sends all buffered messages to the Kafka brokers.
-
#init_transactions ⇒ nil
Initializes the producer to ready for future transactions.
-
#initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:, interceptors: []) ⇒ Producer
constructor
A new instance of Producer.
-
#produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) ⇒ nil
Produces a message to the specified topic.
-
#send_offsets_to_transaction(batch:, group_id:) ⇒ nil
Sends batch last offset to the consumer group coordinator, and also marks this offset as part of the current transaction.
-
#shutdown ⇒ nil
Closes all connections to the brokers.
- #to_s ⇒ Object
-
#transaction ⇒ nil
Syntactic sugar to enable easier transaction usage.
Constructor Details
#initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:, interceptors: []) ⇒ Producer
Returns a new instance of Producer.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/kafka/producer.rb', line 133 def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:, partitioner:, interceptors: []) @cluster = cluster @transaction_manager = transaction_manager @logger = TaggedLogger.new(logger) @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor @partitioner = partitioner @interceptors = Interceptors.new(interceptors: interceptors, logger: logger) # The set of topics that are produced to. @target_topics = Set.new # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end |
Instance Method Details
#abort_transaction ⇒ nil
This method abort the pending transaction, marks all the produced records aborted. All the records will be wiped out by the brokers and the cosumers don't have a chance to consume those messages, except they enable consuming uncommitted option.
This method can only be called if and only if the current transaction is at IN_TRANSACTION state.
337 338 339 |
# File 'lib/kafka/producer.rb', line 337 def abort_transaction @transaction_manager.abort_transaction end |
#begin_transaction ⇒ nil
Mark the beginning of a transaction. This method transitions the state of the transaction trantiions to IN_TRANSACTION.
All producing operations can only be executed while the transation is in this state. The records are persisted by Kafka brokers, but not visible the consumers until the #commit_transaction method is trigger. After a timeout period without committed, the transaction is timeout and considered as aborted.
313 314 315 |
# File 'lib/kafka/producer.rb', line 313 def begin_transaction @transaction_manager.begin_transaction end |
#buffer_bytesize ⇒ Object
275 276 277 |
# File 'lib/kafka/producer.rb', line 275 def buffer_bytesize @pending_message_queue.bytesize + @buffer.bytesize end |
#buffer_size ⇒ Integer
Returns the number of messages currently held in the buffer.
271 272 273 |
# File 'lib/kafka/producer.rb', line 271 def buffer_size @pending_message_queue.size + @buffer.size end |
#clear_buffer ⇒ nil
Deletes all buffered messages.
282 283 284 285 |
# File 'lib/kafka/producer.rb', line 282 def clear_buffer @buffer.clear @pending_message_queue.clear end |
#commit_transaction ⇒ nil
This method commits the pending transaction, marks all the produced records committed. After that, they are visible to the consumers.
This method can only be called if and only if the current transaction is at IN_TRANSACTION state.
324 325 326 |
# File 'lib/kafka/producer.rb', line 324 def commit_transaction @transaction_manager.commit_transaction end |
#deliver_messages ⇒ nil
Sends all buffered messages to the Kafka brokers.
Depending on the value of required_acks
used when initializing the producer,
this call may block until the specified number of replicas have acknowledged
the writes. The ack_timeout
setting places an upper bound on the amount of
time the call will block before failing.
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/kafka/producer.rb', line 250 def # There's no need to do anything if the buffer is empty. return if buffer_size == 0 @instrumenter.instrument("deliver_messages.producer") do |notification| = buffer_size notification[:message_count] = notification[:attempts] = 0 begin (notification) ensure notification[:delivered_message_count] = - buffer_size end end end |
#init_transactions ⇒ nil
Initializes the producer to ready for future transactions. This method should be triggered once, before any tranactions are created.
299 300 301 |
# File 'lib/kafka/producer.rb', line 299 def init_transactions @transaction_manager.init_transactions end |
#produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) ⇒ nil
Produces a message to the specified topic. Note that messages are buffered in the producer until #deliver_messages is called.
Partitioning
There are several options for specifying the partition that the message should be written to.
The simplest option is to not specify a message key, partition key, or partition number, in which case the message will be assigned a partition at random.
You can also specify the partition
parameter yourself. This requires you to
know which partitions are available, however. Oftentimes the best option is
to specify the partition_key
parameter: messages with the same partition
key will always be assigned to the same partition, as long as the number of
partitions doesn't change. You can also omit the partition key and specify
a message key instead. The message key is part of the message payload, and
so can carry semantic value--whether you want to have the message key double
as a partition key is up to you.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/kafka/producer.rb', line 195 def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) # We want to fail fast if `topic` isn't a String topic = topic.to_str = @interceptors.call(PendingMessage.new( value: value && value.to_s, key: key && key.to_s, headers: headers, topic: topic, partition: partition && Integer(partition), partition_key: partition_key && partition_key.to_s, create_time: create_time )) if buffer_size >= @max_buffer_size buffer_overflow topic, "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached" end if buffer_bytesize + .bytesize >= @max_buffer_bytesize buffer_overflow topic, "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached" end # If the producer is in transactional mode, all the message production # must be used when the producer is currently in transaction if @transaction_manager.transactional? && !@transaction_manager.in_transaction? raise "Cannot produce to #{topic}: You must trigger begin_transaction before producing messages" end @target_topics.add(topic) @pending_message_queue.write() @instrumenter.instrument("produce_message.producer", { value: value, key: key, topic: topic, create_time: create_time, message_size: .bytesize, buffer_size: buffer_size, max_buffer_size: @max_buffer_size, }) nil end |
#send_offsets_to_transaction(batch:, group_id:) ⇒ nil
Sends batch last offset to the consumer group coordinator, and also marks this offset as part of the current transaction. This offset will be considered committed only if the transaction is committed successfully.
This method should be used when you need to batch consumed and produced messages together, typically in a consume-transform-produce pattern. Thus, the specified group_id should be the same as config parameter group_id of the used consumer.
351 352 353 |
# File 'lib/kafka/producer.rb', line 351 def send_offsets_to_transaction(batch:, group_id:) @transaction_manager.send_offsets_to_txn(offsets: { batch.topic => { batch.partition => { offset: batch.last_offset + 1, leader_epoch: batch.leader_epoch } } }, group_id: group_id) end |
#shutdown ⇒ nil
Closes all connections to the brokers.
290 291 292 293 |
# File 'lib/kafka/producer.rb', line 290 def shutdown @transaction_manager.close @cluster.disconnect end |
#to_s ⇒ Object
160 161 162 |
# File 'lib/kafka/producer.rb', line 160 def to_s "Producer #{@target_topics.to_a.join(', ')}" end |
#transaction ⇒ nil
Syntactic sugar to enable easier transaction usage. Do the following steps
- Start the transaction (with Producer#begin_transaction)
- Yield the given block
- Commit the transaction (with Producer#commit_transaction)
If the block raises exception, the transaction is automatically aborted before bubble up the exception.
If the block raises Kafka::Producer::AbortTransaction indicator exception, it aborts the transaction silently, without throwing up that exception.
368 369 370 371 372 373 374 375 376 377 378 |
# File 'lib/kafka/producer.rb', line 368 def transaction raise 'This method requires a block' unless block_given? begin_transaction yield commit_transaction rescue Kafka::Producer::AbortTransaction abort_transaction rescue abort_transaction raise end |