Class: Rdkafka::AbstractHandle
- Inherits:
-
FFI::Struct
- Object
- FFI::Struct
- Rdkafka::AbstractHandle
- Defined in:
- lib/rdkafka/abstract_handle.rb
Direct Known Subclasses
Rdkafka::Admin::CreateTopicHandle, Rdkafka::Admin::DeleteTopicHandle, Producer::DeliveryHandle
Defined Under Namespace
Classes: WaitTimeoutError
Constant Summary collapse
- REGISTRY =
Subclasses must define their own layout, and the layout must start with:
layout :pending, :bool, :response, :int
{}
Class Method Summary collapse
Instance Method Summary collapse
-
#create_result ⇒ Object
Operation-specific result.
-
#operation_name ⇒ String
The name of the operation (e.g. "delivery").
-
#pending? ⇒ Boolean
Whether the handle is still pending.
-
#raise_error ⇒ Object
Allow subclasses to override.
-
#wait(max_wait_timeout: 60, wait_timeout: 0.1) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout.
Class Method Details
.register(handle) ⇒ Object
18 19 20 21 |
# File 'lib/rdkafka/abstract_handle.rb', line 18 def self.register(handle) address = handle.to_ptr.address REGISTRY[address] = handle end |
.remove(address) ⇒ Object
23 24 25 |
# File 'lib/rdkafka/abstract_handle.rb', line 23 def self.remove(address) REGISTRY.delete(address) end |
Instance Method Details
#create_result ⇒ Object
Returns operation-specific result.
71 72 73 |
# File 'lib/rdkafka/abstract_handle.rb', line 71 def create_result raise "Must be implemented by subclass!" end |
#operation_name ⇒ String
Returns the name of the operation (e.g. "delivery").
66 67 68 |
# File 'lib/rdkafka/abstract_handle.rb', line 66 def operation_name raise "Must be implemented by subclass!" end |
#pending? ⇒ Boolean
Whether the handle is still pending.
30 31 32 |
# File 'lib/rdkafka/abstract_handle.rb', line 30 def pending? self[:pending] end |
#raise_error ⇒ Object
Allow subclasses to override
76 77 78 |
# File 'lib/rdkafka/abstract_handle.rb', line 76 def raise_error raise RdkafkaError.new(self[:response]) end |
#wait(max_wait_timeout: 60, wait_timeout: 0.1) ⇒ Object
Wait for the operation to complete or raise an error if this takes longer than the timeout. If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation. In this case it is possible to call wait again.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/rdkafka/abstract_handle.rb', line 45 def wait(max_wait_timeout: 60, wait_timeout: 0.1) timeout = if max_wait_timeout CURRENT_TIME.call + max_wait_timeout else nil end loop do if pending? if timeout && timeout <= CURRENT_TIME.call raise WaitTimeoutError.new("Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds") end sleep wait_timeout elsif self[:response] != 0 raise_error else return create_result end end end |