Method: Rdkafka::Producer#purge
- Defined in:
- lib/rdkafka/producer.rb
permalink #purge ⇒ Object
Purges the outgoing queue and releases all resources.
Useful when closing the producer with outgoing messages to unstable clusters or when for
any other reasons waiting cannot go on anymore. This purges both the queue and all the
inflight requests + updates the delivery handles statuses so they can be materialized into
purge_queue
errors.
197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
# File 'lib/rdkafka/producer.rb', line 197 def purge closed_producer_check(__method__) code = nil @native_kafka.with_inner do |inner| code = Bindings.rd_kafka_purge( inner, Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT ) end code.zero? || raise(Rdkafka::RdkafkaError.new(code)) # Wait for the purge to affect everything sleep(0.001) until flush(100) true end |