Class: Rdkafka::NativeKafka

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/rdkafka_patch/0_16_0.rb,
lib/fluent/plugin/rdkafka_patch/0_14_0.rb

Instance Method Summary collapse

Instance Method Details

#close(timeout = nil, object_id = nil) ⇒ Object

return false if producer is forcefully closed, otherwise return true



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/fluent/plugin/rdkafka_patch/0_16_0.rb', line 3

def close(timeout=nil, object_id=nil)
  return true if closed?

  synchronize do
    # Indicate to the outside world that we are closing
    @closing = true

    thread_status = :unknown
    if @polling_thread
      # Indicate to polling thread that we're closing
      @polling_thread[:closing] = true

      # Wait for the polling thread to finish up,
      # this can be aborted in practice if this
      # code runs from a finalizer.
      thread_status = @polling_thread.join(timeout)
    end

    # Destroy the client after locking both mutexes
    @poll_mutex.lock

    # This check prevents a race condition, where we would enter the close in two threads
    # and after unlocking the primary one that hold the lock but finished, ours would be unlocked
    # and would continue to run, trying to destroy inner twice
    retun unless @inner

    Rdkafka::Bindings.rd_kafka_destroy(@inner)
    @inner = nil
    @opaque = nil

    !thread_status.nil?
  end
end