Module: Karafka::Patches::Rdkafka::Bindings

Includes:
Rdkafka::Bindings
Defined in:
lib/karafka/patches/rdkafka/bindings.rb

Overview

Binding patches that slightly change how rdkafka operates in certain places

Constant Summary collapse

RB =

Alias internally

::Rdkafka::Bindings
RebalanceCallback =

This patch changes few things:

  • it commits offsets (if any) upon partition revocation, so less jobs need to be reprocessed if they are assigned to a different process

  • reports callback errors into the errors instrumentation instead of the logger

  • catches only StandardError instead of Exception as we fully control the directly executed callbacks

FFI::Function.new(
  :void, %i[pointer int pointer pointer]
) do |client_ptr, code, partitions_ptr, opaque_ptr|
  # Patch reference
  pr = ::Karafka::Patches::Rdkafka::Bindings
  tpl = ::Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze
  opaque = ::Rdkafka::Config.opaques[opaque_ptr.to_i]

  if RB.rd_kafka_rebalance_protocol(client_ptr) == 'COOPERATIVE'
    pr.on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque)
  else
    pr.on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque)
  end
end

Class Method Summary collapse

Class Method Details

.on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) ⇒ Object

Handle assignments on cooperative rebalance

Parameters:

  • client_ptr (FFI::Pointer)
  • code (Integer)
  • partitions_ptr (FFI::Pointer)
  • tpl (Rdkafka::Consumer::TopicPartitionList)
  • opaque (Rdkafka::Opaque)


23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 23

def on_cooperative_rebalance(client_ptr, code, partitions_ptr, tpl, opaque)
  case code
  when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    opaque&.call_on_partitions_assign(tpl)
    RB.rd_kafka_incremental_assign(client_ptr, partitions_ptr)
    opaque&.call_on_partitions_assigned(tpl)
  when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
    opaque&.call_on_partitions_revoke(tpl)
    RB.rd_kafka_commit(client_ptr, nil, false)
    RB.rd_kafka_incremental_unassign(client_ptr, partitions_ptr)
    opaque&.call_on_partitions_revoked(tpl)
  else
    opaque&.call_on_partitions_assign(tpl)
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
    opaque&.call_on_partitions_assigned(tpl)
  end
end

.on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque) ⇒ Object

Handle assignments on a eager rebalance

Parameters:

  • client_ptr (FFI::Pointer)
  • code (Integer)
  • partitions_ptr (FFI::Pointer)
  • tpl (Rdkafka::Consumer::TopicPartitionList)
  • opaque (Rdkafka::Opaque)


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 48

def on_eager_rebalance(client_ptr, code, partitions_ptr, tpl, opaque)
  case code
  when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    opaque&.call_on_partitions_assign(tpl)
    RB.rd_kafka_assign(client_ptr, partitions_ptr)
    opaque&.call_on_partitions_assigned(tpl)
  when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
    opaque&.call_on_partitions_revoke(tpl)
    RB.rd_kafka_commit(client_ptr, nil, false)
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
    opaque&.call_on_partitions_revoked(tpl)
  else
    opaque&.call_on_partitions_assign(tpl)
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
    opaque&.call_on_partitions_assigned(tpl)
  end
end