Class: Kafka::ProduceOperation

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/produce_operation.rb

Overview

A produce operation attempts to send all messages in a buffer to the Kafka cluster. Since topics and partitions are spread among all brokers in a cluster, this usually involves sending requests to several or all of the brokers.

Instrumentation

When executing the operation, an ack_message.producer.kafka notification will be emitted for each message that was successfully appended to a topic partition. The following keys will be found in the payload:

  • :topic — the topic that was written to.
  • :partition — the partition that the message set was appended to.
  • :offset — the offset of the message in the partition.
  • :key — the message key.
  • :value — the message value.
  • :delay — the time between the message was produced and when it was acknowledged.

In addition to these notifications, a send_messages.producer.kafka notification will be emitted after the operation completes, regardless of whether it succeeds. This notification will have the following keys:

  • :message_count – the total number of messages that the operation tried to send. Note that not all messages may get delivered.
  • :sent_message_count – the number of messages that were successfully sent.

Instance Method Summary collapse

Constructor Details

#initialize(cluster:, transaction_manager:, buffer:, compressor:, required_acks:, ack_timeout:, logger:, instrumenter:) ⇒ ProduceOperation

Returns a new instance of ProduceOperation.



33
34
35
36
37
38
39
40
41
42
# File 'lib/kafka/produce_operation.rb', line 33

def initialize(cluster:, transaction_manager:, buffer:, compressor:, required_acks:, ack_timeout:, logger:, instrumenter:)
  @cluster = cluster
  @transaction_manager = transaction_manager
  @buffer = buffer
  @required_acks = required_acks
  @ack_timeout = ack_timeout
  @compressor = compressor
  @logger = TaggedLogger.new(logger)
  @instrumenter = instrumenter
end

Instance Method Details

#executeObject



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/kafka/produce_operation.rb', line 44

def execute
  if (@transaction_manager.idempotent? || @transaction_manager.transactional?) && @required_acks != -1
    raise 'You must set required_acks option to :all to use idempotent / transactional production'
  end

  if @transaction_manager.transactional? && !@transaction_manager.in_transaction?
    raise "Produce operation can only be executed in a pending transaction"
  end

  @instrumenter.instrument("send_messages.producer") do |notification|
    message_count = @buffer.size

    notification[:message_count] = message_count

    begin
      if @transaction_manager.idempotent? || @transaction_manager.transactional?
        @transaction_manager.init_producer_id
      end
      send_buffered_messages
    ensure
      notification[:sent_message_count] = message_count - @buffer.size
    end
  end
end