Module: Karafka::Patches::Rdkafka::Bindings

Includes:
Rdkafka::Bindings
Defined in:
lib/karafka/patches/rdkafka/bindings.rb

Overview

Binding patches that slightly change how rdkafka operates in certain places

Constant Summary collapse

RB =

Alias internally

::Rdkafka::Bindings
RebalanceCallback =

This patch changes few things:

  • it commits offsets (if any) upon partition revocation, so less jobs need to be reprocessed if they are assigned to a different process

  • reports callback errors into the errors instrumentation instead of the logger

  • catches only StandardError instead of Exception as we fully control the directly executed callbacks

FFI::Function.new(
  :void, %i[pointer int pointer pointer]
) do |client_ptr, code, partitions_ptr, opaque_ptr|
  # Patch reference
  pr = ::Karafka::Patches::Rdkafka::Bindings

  if RB.rd_kafka_rebalance_protocol(client_ptr) == 'COOPERATIVE'
    pr.on_cooperative_rebalance(client_ptr, code, partitions_ptr)
  else
    pr.on_eager_rebalance(client_ptr, code, partitions_ptr)
  end

  opaque = ::Rdkafka::Config.opaques[opaque_ptr.to_i]
  return unless opaque

  tpl = ::Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze

  pr.trigger_callbacks(code, opaque, tpl)
end

Class Method Summary collapse

Class Method Details

.on_cooperative_rebalance(client_ptr, code, partitions_ptr) ⇒ Object

Handle assignments on cooperative rebalance

Parameters:

  • client_ptr (FFI::Pointer)
  • code (Integer)
  • partitions_ptr (FFI::Pointer)


21
22
23
24
25
26
27
28
29
30
31
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 21

def on_cooperative_rebalance(client_ptr, code, partitions_ptr)
  case code
  when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    RB.rd_kafka_incremental_assign(client_ptr, partitions_ptr)
  when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
    RB.rd_kafka_commit(client_ptr, nil, false)
    RB.rd_kafka_incremental_unassign(client_ptr, partitions_ptr)
  else
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
  end
end

.on_eager_rebalance(client_ptr, code, partitions_ptr) ⇒ Object

Handle assignments on a eager rebalance

Parameters:

  • client_ptr (FFI::Pointer)
  • code (Integer)
  • partitions_ptr (FFI::Pointer)


38
39
40
41
42
43
44
45
46
47
48
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 38

def on_eager_rebalance(client_ptr, code, partitions_ptr)
  case code
  when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
    RB.rd_kafka_assign(client_ptr, partitions_ptr)
  when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
    RB.rd_kafka_commit(client_ptr, nil, false)
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
  else
    RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
  end
end

.trigger_callbacks(code, opaque, tpl) ⇒ Object

Trigger Karafka callbacks

Parameters:

  • code (Integer)
  • opaque (Rdkafka::Opaque)
  • tpl (Rdkafka::Consumer::TopicPartitionList)


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 55

def trigger_callbacks(code, opaque, tpl)
  Karafka.monitor.instrument(
    'connection.client.rebalance_callback',
    caller: self,
    code: code,
    tpl: tpl
  ) do
    case code
    when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
      opaque.call_on_partitions_assigned(tpl)
    when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
      opaque.call_on_partitions_revoked(tpl)
    end
  end
rescue StandardError => e
  Karafka.monitor.instrument(
    'error.occurred',
    caller: self,
    error: e,
    type: 'connection.client.rebalance_callback.error'
  )
end