Module: Rdkafka::Callbacks

Defined in:
lib/rdkafka/callbacks.rb

Defined Under Namespace

Classes: DescribeConfigsResult, GroupResult, IncrementalAlterConfigsResult

Constant Summary collapse

@@mutex =
Mutex.new
@@current_pid =
nil

Class Method Summary collapse

Class Method Details

.ensure_ffi_runningObject

Defines or recreates after fork callbacks that require FFI thread so the callback thread is always correctly initialized

[View source] [View on GitHub]

384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
# File 'lib/rdkafka/callbacks.rb', line 384

def ensure_ffi_running
  @@mutex.synchronize do
    return if @@current_pid == ::Process.pid

    if const_defined?(:BackgroundEventCallbackFunction, false)
      send(:remove_const, :BackgroundEventCallbackFunction)
      send(:remove_const, :DeliveryCallbackFunction)
    end

    # FFI Function used for Create Topic and Delete Topic callbacks
    background_event_callback_function = FFI::Function.new(
        :void, [:pointer, :pointer, :pointer]
    ) do |client_ptr, event_ptr, opaque_ptr|
      BackgroundEventCallback.call(client_ptr, event_ptr, opaque_ptr)
    end

    # FFI Function used for Message Delivery callbacks
    delivery_callback_function = FFI::Function.new(
        :void, [:pointer, :pointer, :pointer]
    ) do |client_ptr, message_ptr, opaque_ptr|
      DeliveryCallback.call(client_ptr, message_ptr, opaque_ptr)
    end

    const_set(:BackgroundEventCallbackFunction, background_event_callback_function)
    const_set(:DeliveryCallbackFunction, delivery_callback_function)

    @@current_pid = ::Process.pid
  end
end