Class: Rdkafka::Producer
- Inherits:
-
Object
- Object
- Rdkafka::Producer
- Defined in:
- lib/rdkafka/producer.rb,
lib/rdkafka/producer/delivery_handle.rb,
lib/rdkafka/producer/delivery_report.rb
Overview
Defined Under Namespace
Classes: DeliveryHandle, DeliveryReport
Instance Attribute Summary collapse
-
#delivery_callback ⇒ nil
writeonly
Set a callback that will be called every time a message is successfully produced.
Instance Method Summary collapse
-
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
- #closed_producer_check(method) ⇒ Object
-
#partition_count(topic) ⇒ Object
Partition count for a given topic.
-
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic.
Instance Attribute Details
#delivery_callback=(callback) ⇒ nil
Set a callback that will be called every time a message is successfully produced. The callback is called with a DeliveryReport
37 38 39 40 |
# File 'lib/rdkafka/producer.rb', line 37 def delivery_callback=(callback) raise TypeError.new("Callback has to be a proc or lambda") unless callback.is_a? Proc @delivery_callback = callback end |
Instance Method Details
#close ⇒ Object
Close this producer and wait for the internal poll queue to empty.
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/rdkafka/producer.rb', line 43 def close return unless @native_kafka # Indicate to polling thread that we're closing @closing = true # Wait for the polling thread to finish up @polling_thread.join Rdkafka::Bindings.rd_kafka_destroy(@native_kafka) @native_kafka = nil end |
#closed_producer_check(method) ⇒ Object
174 175 176 |
# File 'lib/rdkafka/producer.rb', line 174 def closed_producer_check(method) raise Rdkafka::ClosedProducerError.new(method) if @native_kafka.nil? end |
#partition_count(topic) ⇒ Object
Partition count for a given topic. NOTE: If 'allow.auto.create.topics' is set to true in the broker, the topic will be auto-created after returning nil.
61 62 63 64 |
# File 'lib/rdkafka/producer.rb', line 61 def partition_count(topic) closed_producer_check(__method__) Rdkafka::Metadata.new(@native_kafka, topic).topics&.first[:partition_count] end |
#produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) ⇒ DeliveryHandle
Produces a message to a Kafka topic. The message is added to rdkafka's queue, call wait on the returned delivery handle to make sure it is delivered.
When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. When a timestamp is provided this is used instead of the auto-generated timestamp.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/rdkafka/producer.rb', line 81 def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) closed_producer_check(__method__) # Start by checking and converting the input # Get payload length payload_size = if payload.nil? 0 else payload.bytesize end # Get key length key_size = if key.nil? 0 else key.bytesize end if partition_key partition_count = partition_count(topic) # If the topic is not present, set to -1 partition = Rdkafka::Bindings.partitioner(partition_key, partition_count) if partition_count end # If partition is nil, use -1 to let librdafka set the partition randomly or # based on the key when present. partition ||= -1 # If timestamp is nil use 0 and let Kafka set one. If an integer or time # use it. = if .nil? 0 elsif .is_a?(Integer) elsif .is_a?(Time) (.to_i * 1000) + (.usec / 1000) else raise TypeError.new("Timestamp has to be nil, an Integer or a Time") end delivery_handle = DeliveryHandle.new delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 DeliveryHandle.register(delivery_handle) args = [ :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, , :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle, ] if headers headers.each do |key0, value0| key = key0.to_s value = value0.to_s args += [ :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER, :string, key, :pointer, value, :size_t, value.bytes.size ] end end args += [:int, Rdkafka::Bindings::RD_KAFKA_VTYPE_END] # Produce the message response = Rdkafka::Bindings.rd_kafka_producev( @native_kafka, *args ) # Raise error if the produce call was not successful if response != 0 DeliveryHandle.remove(delivery_handle.to_ptr.address) raise RdkafkaError.new(response) end delivery_handle end |