Class: Rdkafka::NativeKafka
- Inherits:
-
Object
- Object
- Rdkafka::NativeKafka
- Defined in:
- lib/fluent/plugin/rdkafka_patch/0_16_0.rb,
lib/fluent/plugin/rdkafka_patch/0_14_0.rb
Instance Method Summary collapse
-
#close(timeout = nil, object_id = nil) ⇒ Object
return false if producer is forcefully closed, otherwise return true.
Instance Method Details
#close(timeout = nil, object_id = nil) ⇒ Object
return false if producer is forcefully closed, otherwise return true
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/fluent/plugin/rdkafka_patch/0_16_0.rb', line 3 def close(timeout=nil, object_id=nil) return true if closed? synchronize do # Indicate to the outside world that we are closing @closing = true thread_status = :unknown if @polling_thread # Indicate to polling thread that we're closing @polling_thread[:closing] = true # Wait for the polling thread to finish up, # this can be aborted in practice if this # code runs from a finalizer. thread_status = @polling_thread.join(timeout) end # Destroy the client after locking both mutexes @poll_mutex.lock # This check prevents a race condition, where we would enter the close in two threads # and after unlocking the primary one that hold the lock but finished, ours would be unlocked # and would continue to run, trying to destroy inner twice retun unless @inner Rdkafka::Bindings.rd_kafka_destroy(@inner) @inner = nil @opaque = nil !thread_status.nil? end end |