Method: Rdkafka::Producer#purge

Defined in:
lib/rdkafka/producer.rb

#purgeObject

Purges the outgoing queue and releases all resources.

Useful when closing the producer with outgoing messages to unstable clusters or when for any other reasons waiting cannot go on anymore. This purges both the queue and all the inflight requests + updates the delivery handles statuses so they can be materialized into purge_queue errors.

[View source] [View on GitHub]

197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/rdkafka/producer.rb', line 197

def purge
  closed_producer_check(__method__)

  code = nil

  @native_kafka.with_inner do |inner|
    code = Bindings.rd_kafka_purge(
      inner,
      Bindings::RD_KAFKA_PURGE_F_QUEUE | Bindings::RD_KAFKA_PURGE_F_INFLIGHT
    )
  end

  code.zero? || raise(Rdkafka::RdkafkaError.new(code))

  # Wait for the purge to affect everything
  sleep(0.001) until flush(100)

  true
end