Module: Karafka::Patches::Rdkafka::Bindings
- Includes:
- Rdkafka::Bindings
- Defined in:
- lib/karafka/patches/rdkafka/bindings.rb
Overview
Binding patches that slightly change how rdkafka operates in certain places
Constant Summary collapse
- RB =
Alias internally
::Rdkafka::Bindings
- RebalanceCallback =
This patch changes few things:
-
it commits offsets (if any) upon partition revocation, so less jobs need to be reprocessed if they are assigned to a different process
-
reports callback errors into the errors instrumentation instead of the logger
-
catches only StandardError instead of Exception as we fully control the directly executed callbacks
-
FFI::Function.new( :void, %i[pointer int pointer pointer] ) do |client_ptr, code, partitions_ptr, opaque_ptr| # Patch reference pr = ::Karafka::Patches::Rdkafka::Bindings if RB.rd_kafka_rebalance_protocol(client_ptr) == 'COOPERATIVE' pr.on_cooperative_rebalance(client_ptr, code, partitions_ptr) else pr.on_eager_rebalance(client_ptr, code, partitions_ptr) end opaque = ::Rdkafka::Config.opaques[opaque_ptr.to_i] return unless opaque tpl = ::Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze pr.trigger_callbacks(code, opaque, tpl) end
Class Method Summary collapse
-
.on_cooperative_rebalance(client_ptr, code, partitions_ptr) ⇒ Object
Handle assignments on cooperative rebalance.
-
.on_eager_rebalance(client_ptr, code, partitions_ptr) ⇒ Object
Handle assignments on a eager rebalance.
-
.trigger_callbacks(code, opaque, tpl) ⇒ Object
Trigger Karafka callbacks.
Class Method Details
.on_cooperative_rebalance(client_ptr, code, partitions_ptr) ⇒ Object
Handle assignments on cooperative rebalance
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 21 def on_cooperative_rebalance(client_ptr, code, partitions_ptr) case code when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS RB.rd_kafka_incremental_assign(client_ptr, partitions_ptr) when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS RB.rd_kafka_commit(client_ptr, nil, false) RB.rd_kafka_incremental_unassign(client_ptr, partitions_ptr) else RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) end end |
.on_eager_rebalance(client_ptr, code, partitions_ptr) ⇒ Object
Handle assignments on a eager rebalance
38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 38 def on_eager_rebalance(client_ptr, code, partitions_ptr) case code when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS RB.rd_kafka_assign(client_ptr, partitions_ptr) when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS RB.rd_kafka_commit(client_ptr, nil, false) RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) else RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL) end end |
.trigger_callbacks(code, opaque, tpl) ⇒ Object
Trigger Karafka callbacks
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/karafka/patches/rdkafka/bindings.rb', line 55 def trigger_callbacks(code, opaque, tpl) Karafka.monitor.instrument( 'connection.client.rebalance_callback', caller: self, code: code, tpl: tpl ) do case code when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS opaque.call_on_partitions_assigned(tpl) when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS opaque.call_on_partitions_revoked(tpl) end end rescue StandardError => e Karafka.monitor.instrument( 'error.occurred', caller: self, error: e, type: 'connection.client.rebalance_callback.error' ) end |