Class: Kafka::Producer

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/producer.rb,
lib/kafka/producer/delivery_report.rb

Defined Under Namespace

Classes: DeliveryReport

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Producer

Initialize a new Producer for the configured cluster.

Parameters:



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/kafka/producer.rb', line 21

def initialize(config)
  config = config.dup

  # Configure callbacks
  config.on_delivery_report(&method(:on_delivery_report))

  @client = ::Kafka::FFI::Producer.new(config)

  # Periodically call poll on the client to ensure callbacks are fired.
  @poller = Poller.new(@client)
end

Instance Attribute Details

#clientKafka::FFI::Producer (readonly)

Returns the backing Kafka::FFI::Producer.



16
17
18
# File 'lib/kafka/producer.rb', line 16

def client
  @client
end

Instance Method Details

#close(timeout: 30000) ⇒ Object

Note:

Once #close is call it is no longer safe to call any other method on the Producer.

Gracefully shutdown the Producer, flushing any pending deliveries, and finally releasing an memory back to the system.

Parameters:

  • timeout (Integer) (defaults to: 30000)

    Maximum time to wait in milliseconds for messages to be flushed.



97
98
99
100
101
102
103
104
105
106
# File 'lib/kafka/producer.rb', line 97

def close(timeout: 30000)
  # @see https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#producer
  @poller.stop

  @client.flush(timeout: timeout)
  @client.poll

  # Client handles destroying cached Topics
  @client.destroy
end

#flush(timeout: 1000) ⇒ Object

Wait until all outstanding produce requests are completed.

Raises:



85
86
87
# File 'lib/kafka/producer.rb', line 85

def flush(timeout: 1000)
  @client.flush(timeout: timeout)
end

#produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, &block) {|report| ... } ⇒ DeliveryReport

Produce and publish a message to the Kafka cluster.

Parameters:

  • topic (String)

    Topic to publish the message to

  • payload (String)

    Message’s payload

  • key (String) (defaults to: nil)

    Optional partitioning key Kafka can use to determine the correct partition.

  • partition (-1, nil) (defaults to: nil)

    Kafka will determine the partition automatically based on the ‘partitioner` config option.

  • partition (Integer) (defaults to: nil)

    Specifiy partition to publish the message to.

  • headers (Hash{String => String}) (defaults to: nil)

    Set of headers to attach to the message.

  • timestamp (nil) (defaults to: nil)

    Let Kafka automatically assign Message timestamp

  • timestamp (Time) (defaults to: nil)

    Timestamp for the message

  • timestamp (Integer) (defaults to: nil)

    Timestamp as milliseconds since unix epoch

  • block (Proc)

    Optional asyncronous callback to be called when the delivery status of the message is reported back from librdkafka. The callback MUST avoid expensive or long running processing as that may causes issues inside librdkafka.

Yields:

  • (report)

    Called asyncronously when the report is received from Kafka on the success or failure of the delivery.

Yield Parameters:

Returns:

  • (DeliveryReport)

    Report of the success or failure of the delivery. Call #wait to block until the report is received.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/kafka/producer.rb', line 59

def produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, &block)
  report = DeliveryReport.new(&block)

  # Allocate a pointer to a small chunk of memory. We will use the pointer
  # (not the value it points to) as a key for looking up the DeliveryReport
  # in the callback.
  #
  # Using a MemoryPointer as a key also ensures we have a reference to the
  # Pointer so it doesn't get garbage collected away and it can be freed it
  # in the callback since the raw FFI::Pointer disallows #free as FFI
  # doesn't believe we allocated it.
  opaque = Kafka::FFI::Opaque.new(report)

  @client.produce(topic, payload, key: key, partition: partition, headers: headers, timestamp: timestamp, opaque: opaque)

  report
rescue
  opaque.free

  raise
end