Module: Karafka::Patches::Rdkafka::Bindings

Includes:
Rdkafka::Bindings
Defined in:
lib/karafka/patches/rdkafka/bindings.rb

Overview

Binding patches that slightly change how rdkafka operates in certain places

Constant Summary collapse

RB =

Alias internally

::Rdkafka::Bindings
RebalanceCallback =

This patch changes few things:

  • it commits offsets (if any) upon partition revocation, so less jobs need to be reprocessed if they are assigned to a different process

  • reports callback errors into the errors instrumentation instead of the logger

  • catches only StandardError instead of Exception as we fully control the directly executed callbacks

FFI::Function.new(
  :void, %i[pointer int pointer pointer]
) do |client_ptr, code, partitions_ptr, opaque_ptr|
  # Patch reference
  pr = ::Karafka::Patches::Rdkafka::Bindings

  if RB.rd_kafka_rebalance_protocol(client_ptr) == 'COOPERATIVE'
    pr.on_cooperative_rebalance(client_ptr, code, partitions_ptr)
  else
    pr.on_eager_rebalance(client_ptr, code, partitions_ptr)
  end

  opaque = ::Rdkafka::Config.opaques[opaque_ptr.to_i]
  return unless opaque

  tpl = ::Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze
  consumer = ::Rdkafka::Consumer.new(client_ptr)

  pr.trigger_callbacks(code, opaque, consumer, tpl)
end

Class Method Summary collapse

Class Method Details

.on_cooperative_rebalance(client_ptr, code, partitions_ptr) ⇒ Object

Handle assignments on cooperative rebalance

Parameters:

  • client_ptr (FFI::Pointer)
  • code (Integer)
  • partitions_ptr (FFI::Pointer)


19
20
21
22
23
24
25
26
27
28
29
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 19

def on_cooperative_rebalance(client_ptr, code, partitions_ptr)
  case code
  when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    RB.rd_kafka_incremental_assign(client_ptr, partitions_ptr)
  when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
    RB.rd_kafka_commit(client_ptr, nil, false)
    RB.rd_kafka_incremental_unassign(client_ptr, partitions_ptr)
  else
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
  end
end

.on_eager_rebalance(client_ptr, code, partitions_ptr) ⇒ Object

Handle assignments on a eager rebalance

Parameters:

  • client_ptr (FFI::Pointer)
  • code (Integer)
  • partitions_ptr (FFI::Pointer)


36
37
38
39
40
41
42
43
44
45
46
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 36

def on_eager_rebalance(client_ptr, code, partitions_ptr)
  case code
  when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    RB.rd_kafka_assign(client_ptr, partitions_ptr)
  when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
    RB.rd_kafka_commit(client_ptr, nil, false)
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
  else
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
  end
end

.trigger_callbacks(code, opaque, consumer, tpl) ⇒ Object

Trigger Karafka callbacks

Parameters:

  • code (Integer)
  • opaque (Rdkafka::Opaque)
  • consumer (Rdkafka::Consumer)
  • tpl (Rdkafka::Consumer::TopicPartitionList)


54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 54

def trigger_callbacks(code, opaque, consumer, tpl)
  case code
  when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    opaque.call_on_partitions_assigned(consumer, tpl)
  when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
    opaque.call_on_partitions_revoked(consumer, tpl)
  end
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    type: 'connection.client.rebalance_callback.error'
  )
end