Class: Kafka::AsyncProducer
- Inherits:
-
Object
- Object
- Kafka::AsyncProducer
- Defined in:
- lib/kafka/async_producer.rb
Overview
A Kafka producer that does all its work in the background so as to not block the calling thread. Calls to #deliver_messages are asynchronous and return immediately.
In addition to this property it's possible to define automatic delivery policies. These allow placing an upper bound on the number of buffered messages and the time between message deliveries.
- If
delivery_threshold
is set to a value n higher than zero, the producer will automatically deliver its messages once its buffer size reaches n. - If
delivery_interval
is set to a value n higher than zero, the producer will automatically deliver its messages every n seconds.
By default, automatic delivery is disabled and you'll have to call #deliver_messages manually.
Buffer Overflow and Backpressure
The calling thread communicates with the background thread doing the actual
work using a thread safe queue. While the background thread is busy delivering
messages, new messages will be buffered in the queue. In order to avoid
the queue growing uncontrollably in cases where the background thread gets
stuck or can't follow the pace of the calling thread, there's a maximum
number of messages that is allowed to be buffered. You can configure this
value by setting max_queue_size
.
If you produce messages faster than the background producer thread can
deliver them to Kafka you will eventually fill the producer's buffer. Once
this happens, the background thread will stop popping messages off the
queue until it can successfully deliver the buffered messages. The queue
will therefore grow in size, potentially hitting the max_queue_size
limit.
Once this happens, calls to #produce will raise a BufferOverflow error.
Depending on your use case you may want to slow down the rate of messages being produced or perhaps halt your application completely until the producer can deliver the buffered messages and clear the message queue.
Example
producer = kafka.async_producer(
# Keep at most 1.000 messages in the buffer before delivering:
delivery_threshold: 1000,
# Deliver messages every 30 seconds:
delivery_interval: 30,
)
# There's no need to manually call #deliver_messages, it will happen
# automatically in the background.
producer.produce("hello", topic: "greetings")
# Remember to shut down the producer when you're done with it.
producer.shutdown
Defined Under Namespace
Instance Method Summary collapse
-
#deliver_messages ⇒ nil
Asynchronously delivers the buffered messages.
-
#initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1,, retry_backoff: 0, instrumenter:, logger:) ⇒ AsyncProducer
constructor
Initializes a new AsyncProducer.
-
#produce(value, topic:, **options) ⇒ nil
Produces a message to the specified topic.
-
#shutdown ⇒ nil
Shuts down the producer, releasing the network resources used.
Constructor Details
#initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1,, retry_backoff: 0, instrumenter:, logger:) ⇒ AsyncProducer
Initializes a new AsyncProducer.
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/kafka/async_producer.rb', line 73 def initialize(sync_producer:, max_queue_size: 1000, delivery_threshold: 0, delivery_interval: 0, max_retries: -1, retry_backoff: 0, instrumenter:, logger:) raise ArgumentError unless max_queue_size > 0 raise ArgumentError unless delivery_threshold >= 0 raise ArgumentError unless delivery_interval >= 0 @queue = Queue.new @max_queue_size = max_queue_size @instrumenter = instrumenter @logger = TaggedLogger.new(logger) @worker = Worker.new( queue: @queue, producer: sync_producer, delivery_threshold: delivery_threshold, max_retries: max_retries, retry_backoff: retry_backoff, instrumenter: instrumenter, logger: logger ) # The timer will no-op if the delivery interval is zero. @timer = Timer.new(queue: @queue, interval: delivery_interval) @thread_mutex = Mutex.new end |
Instance Method Details
#deliver_messages ⇒ nil
Asynchronously delivers the buffered messages. This method will return immediately and the actual work will be done in the background.
133 134 135 136 137 138 139 |
# File 'lib/kafka/async_producer.rb', line 133 def ensure_threads_running! @queue << [:deliver_messages, nil] nil end |
#produce(value, topic:, **options) ⇒ nil
Produces a message to the specified topic.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/kafka/async_producer.rb', line 105 def produce(value, topic:, **) # We want to fail fast if `topic` isn't a String topic = topic.to_str ensure_threads_running! if @queue.size >= @max_queue_size buffer_overflow topic, "Cannot produce to #{topic}, max queue size (#{@max_queue_size} messages) reached" end args = [value, **.merge(topic: topic)] @queue << [:produce, args] @instrumenter.instrument("enqueue_message.async_producer", { topic: topic, queue_size: @queue.size, max_queue_size: @max_queue_size, }) nil end |
#shutdown ⇒ nil
Shuts down the producer, releasing the network resources used. This method will block until the buffered messages have been delivered.
146 147 148 149 150 151 152 153 154 |
# File 'lib/kafka/async_producer.rb', line 146 def shutdown ensure_threads_running! @timer_thread && @timer_thread.exit @queue << [:shutdown, nil] @worker_thread && @worker_thread.join nil end |