Class: Kafka::ProduceOperation
- Inherits:
-
Object
- Object
- Kafka::ProduceOperation
- Defined in:
- lib/kafka/produce_operation.rb
Overview
A produce operation attempts to send all messages in a buffer to the Kafka cluster. Since topics and partitions are spread among all brokers in a cluster, this usually involves sending requests to several or all of the brokers.
## Instrumentation
When executing the operation, an ‘ack_message.producer.kafka` notification will be emitted for each message that was successfully appended to a topic partition. The following keys will be found in the payload:
-
‘:topic` — the topic that was written to.
-
‘:partition` — the partition that the message set was appended to.
-
‘:offset` — the offset of the message in the partition.
-
‘:key` — the message key.
-
‘:value` — the message value.
-
‘:delay` — the time between the message was produced and when it was acknowledged.
In addition to these notifications, a ‘send_messages.producer.kafka` notification will be emitted after the operation completes, regardless of whether it succeeds. This notification will have the following keys:
-
‘:message_count` – the total number of messages that the operation tried to send. Note that not all messages may get delivered.
-
‘:sent_message_count` – the number of messages that were successfully sent.
Instance Method Summary collapse
- #execute ⇒ Object
-
#initialize(cluster:, transaction_manager:, buffer:, compressor:, required_acks:, ack_timeout:, logger:, instrumenter:) ⇒ ProduceOperation
constructor
A new instance of ProduceOperation.
Constructor Details
#initialize(cluster:, transaction_manager:, buffer:, compressor:, required_acks:, ack_timeout:, logger:, instrumenter:) ⇒ ProduceOperation
Returns a new instance of ProduceOperation.
33 34 35 36 37 38 39 40 41 42 |
# File 'lib/kafka/produce_operation.rb', line 33 def initialize(cluster:, transaction_manager:, buffer:, compressor:, required_acks:, ack_timeout:, logger:, instrumenter:) @cluster = cluster @transaction_manager = transaction_manager @buffer = buffer @required_acks = required_acks @ack_timeout = ack_timeout @compressor = compressor @logger = TaggedLogger.new(logger) @instrumenter = instrumenter end |
Instance Method Details
#execute ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/kafka/produce_operation.rb', line 44 def execute if (@transaction_manager.idempotent? || @transaction_manager.transactional?) && @required_acks != -1 raise 'You must set required_acks option to :all to use idempotent / transactional production' end if @transaction_manager.transactional? && !@transaction_manager.in_transaction? raise "Produce operation can only be executed in a pending transaction" end @instrumenter.instrument("send_messages.producer") do |notification| = @buffer.size notification[:message_count] = begin if @transaction_manager.idempotent? || @transaction_manager.transactional? @transaction_manager.init_producer_id end ensure notification[:sent_message_count] = - @buffer.size end end end |