Class: Kafka::FFI::Producer

Inherits:
Client show all
Defined in:
lib/kafka/ffi/producer.rb

Instance Attribute Summary

Attributes inherited from OpaquePointer

#pointer

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Client

#alter_configs, #brokers_add, #cluster_id, #config, #controller_id, #create_partitions, #create_topics, #default_topic_conf_dup, #delete_topics, #describe_configs, #destroy, from_native, #get_background_queue, #get_main_queue, #group_list, #initialize, #metadata, #name, #offsets_for_times, #outq_len, #pause_partitions, #poll, #query_watermark_offsets, #resume_partitions, #set_log_queue, #topic

Methods inherited from OpaquePointer

by_ref, from_native, inherited, #initialize, to_native

Constructor Details

This class inherits a constructor from Kafka::FFI::Client

Class Method Details

.new(config = nil) ⇒ Object



8
9
10
# File 'lib/kafka/ffi/producer.rb', line 8

def self.new(config = nil)
  super(:producer, config)
end

Instance Method Details

#flush(timeout: 1000) ⇒ Object

Wait until all outstanding produce requests are completed. This should typically be done prior to destroying a producer to ensure all queued and in-flight requests are completed before terminating.

Raises:



114
115
116
117
118
119
120
121
# File 'lib/kafka/ffi/producer.rb', line 114

def flush(timeout: 1000)
  err = ::Kafka::FFI.rd_kafka_flush(self, timeout)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end

#produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, opaque: nil) ⇒ Object

Produce and send a single message to the Kafka cluster.

Parameters:

  • topic (Topic, String)

    Topic (or name of topic) to receive the message.

  • payload (String, nil)

    Content of the message.

  • key (String) (defaults to: nil)

    Message partitioning key

  • partition (nil, -1) (defaults to: nil)

    Use the configured partitioner to determine which partition to publish the message to.

  • partition (Integer) (defaults to: nil)

    Partition of the topic that should receive the message.

  • headers (Kafka::FFI::Message::Header) (defaults to: nil)
  • timestamp (Time) (defaults to: nil)

    Timestamp as Time

  • timestamp (Integer) (defaults to: nil)

    Timestamp as milliseconds since unix epoch

  • timestamp (nil) (defaults to: nil)

    Timestamp is assigned by librdkafka

  • opaque (Opaque) (defaults to: nil)

    Reference to an object owned by the application which will be available as Message#opaque in callbacks. The application MUST call #free on the Opaque once the final callback has been triggered to avoid leaking memory.

Raises:

  • (Kafka::QueueFullError)

    Number of messages queued for delivery has exceeded capacity. See ‘queue.buffering.max.messages` config setting to adjust.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/kafka/ffi/producer.rb', line 40

def produce(topic, payload, key: nil, partition: nil, headers: nil, timestamp: nil, opaque: nil)
  args = [
    # Ensure librdkafka copies the payload into its own memory since the
    # string backing it could be garbage collected.
    :vtype, :msgflags, :int, Kafka::FFI::RD_KAFKA_MSG_F_COPY,
  ]

  if payload
    args.append(:vtype, :value, :buffer_in, payload, :size_t, payload.bytesize)
  end

  # The partitioning key is optional
  if key
    args.append(:vtype, :key, :buffer_in, key, :size_t, key.bytesize)
  end

  # Partition will default to being auto assigned by the configured
  # partitioning strategy.
  if partition
    args.append(:vtype, :partition, :int32, partition)
  end

  # Headers are optional and can be passed as either a reference to a
  # Header object or individual key/value pairs. This only supports the
  # Header object because supporting key + valu
  if headers
    args.append(:vtype, :headers, :pointer, headers.pointer)
  end

  case topic
  when Topic
    args.append(:vtype, :rkt, :pointer, topic.pointer)
  when String
    args.append(:vtype, :topic, :string, topic)
  else
    raise ArgumentError, "topic must be either a Topic or String"
  end

  if opaque
    args.append(:vtype, :opaque, :pointer, opaque.pointer)
  end

  if timestamp
    ts =
      case timestamp
      when Time    then ((timestamp.to_i * 1000) + (timestamp.nsec / 1000))
      when Integer then timestamp
      else
        raise ArgumentError, "timestamp must be nil, a Time, or an Integer"
      end

    args.append(:vtype, :timestamp, :int64, ts)
  end

  # Add the sentinel value to denote the end of the argument list.
  args.append(:vtype, :end)

  err = ::Kafka::FFI.rd_kafka_producev(self, *args)
  if err != :ok
    # The only documented error is RD_KAFKA_RESP_ERR__CONFLICT should both
    # HEADER and HEADERS keys be passed in. There is no way for HEADER to
    # be passed to producev based on the above implementation.
    raise ::Kafka::ResponseError, err
  end

  nil
end

#purge(queued: true, inflight: true, blocking: false) ⇒ Object

Purge messages currently handled by the Producer. By default this will purge all queued and inflight messages asyncronously.

Parameters:

  • queued (Boolean) (defaults to: true)

    Purge any queued messages

  • inflight (Boolean) (defaults to: true)

    Purge messages that are inflight

  • blocking (Boolean) (defaults to: false)

    When true don’t wait for background thread queue purging to finish.

Raises:

  • (Kafka::ResponseError)

    Error occurred purging state. This is unlikely as the documented error are not possible with this implementation.



134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/kafka/ffi/producer.rb', line 134

def purge(queued: true, inflight: true, blocking: false)
  mask = 0
  mask |= RD_KAFKA_PURGE_F_QUEUE if queued
  mask |= RD_KAFKA_PURGE_F_INFLIGHT if inflight
  mask |= RD_KAFKA_PURGE_F_NON_BLOCKING if blocking

  err = ::Kafka::FFI.rd_kafka_purge(self, mask)
  if err != :ok
    raise ::Kafka::ResponseError, err
  end

  nil
end