Method: Kafka::Producer#produce
- Defined in:
- lib/kafka/producer.rb
#produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) ⇒ nil
Produces a message to the specified topic. Note that messages are buffered in the producer until #deliver_messages is called.
Partitioning
There are several options for specifying the partition that the message should be written to.
The simplest option is to not specify a message key, partition key, or partition number, in which case the message will be assigned a partition at random.
You can also specify the partition
parameter yourself. This requires you to
know which partitions are available, however. Oftentimes the best option is
to specify the partition_key
parameter: messages with the same partition
key will always be assigned to the same partition, as long as the number of
partitions doesn't change. You can also omit the partition key and specify
a message key instead. The message key is part of the message payload, and
so can carry semantic value--whether you want to have the message key double
as a partition key is up to you.
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
# File 'lib/kafka/producer.rb', line 195 def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) # We want to fail fast if `topic` isn't a String topic = topic.to_str = @interceptors.call(PendingMessage.new( value: value && value.to_s, key: key && key.to_s, headers: headers, topic: topic, partition: partition && Integer(partition), partition_key: partition_key && partition_key.to_s, create_time: create_time )) if buffer_size >= @max_buffer_size buffer_overflow topic, "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached" end if buffer_bytesize + .bytesize >= @max_buffer_bytesize buffer_overflow topic, "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached" end # If the producer is in transactional mode, all the message production # must be used when the producer is currently in transaction if @transaction_manager.transactional? && !@transaction_manager.in_transaction? raise "Cannot produce to #{topic}: You must trigger begin_transaction before producing messages" end @target_topics.add(topic) @pending_message_queue.write() @instrumenter.instrument("produce_message.producer", { value: value, key: key, topic: topic, create_time: create_time, message_size: .bytesize, buffer_size: buffer_size, max_buffer_size: @max_buffer_size, }) nil end |