Class: Kafka::Producer::DeliveryReport
- Inherits:
-
Object
- Object
- Kafka::Producer::DeliveryReport
- Defined in:
- lib/kafka/producer/delivery_report.rb
Instance Attribute Summary collapse
- #error ⇒ nil, Kafka::ResponseError readonly
-
#latency ⇒ nil, Integer
readonly
Returns the number of microseconds since the message was enqueued for delivery until the message was confirmed by the cluster or permanently failed.
- #offset ⇒ nil, Integer readonly
- #partition ⇒ nil, Integer readonly
- #topic ⇒ nil, String readonly
Instance Method Summary collapse
-
#done(message) ⇒ Object
Set the response based on the message and notify anyone waiting on the result.
-
#error? ⇒ Boolean
Returns if the delivery errored.
-
#initialize(&block) ⇒ DeliveryReport
constructor
Initializes a new DeliveryReport.
-
#received? ⇒ Boolean
Returns true when the report has been received back from the kafka cluster.
-
#successful? ⇒ Boolean
Returns if the delivery was successful.
-
#wait(timeout: 5000) ⇒ Object
Wait for a report to be received for the delivery from the cluster.
Constructor Details
#initialize(&block) ⇒ DeliveryReport
Initializes a new DeliveryReport
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/kafka/producer/delivery_report.rb', line 37 def initialize(&block) @mutex = Mutex.new @waiter = ConditionVariable.new @error = nil @topic = nil @offset = nil @latency = nil @partition = nil @callback = block # Will be set to true by a call to #done. Fast out for any callers to # #wait that may come in after done has already been called. @done = false end |
Instance Attribute Details
#error ⇒ nil, Kafka::ResponseError (readonly)
8 9 10 |
# File 'lib/kafka/producer/delivery_report.rb', line 8 def error @error end |
#latency ⇒ nil, Integer (readonly)
Latency is in microseconds (μs) while most other timestamps are in milliseconds.
Returns the number of microseconds since the message was enqueued for delivery until the message was confirmed by the cluster or permanently failed.
31 32 33 |
# File 'lib/kafka/producer/delivery_report.rb', line 31 def latency @latency end |
#offset ⇒ nil, Integer (readonly)
16 17 18 |
# File 'lib/kafka/producer/delivery_report.rb', line 16 def offset @offset end |
#partition ⇒ nil, Integer (readonly)
20 21 22 |
# File 'lib/kafka/producer/delivery_report.rb', line 20 def partition @partition end |
#topic ⇒ nil, String (readonly)
12 13 14 |
# File 'lib/kafka/producer/delivery_report.rb', line 12 def topic @topic end |
Instance Method Details
#done(message) ⇒ Object
Set the response based on the message and notify anyone waiting on the result.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/kafka/producer/delivery_report.rb', line 85 def done() @mutex.synchronize do @error = .error @offset = .offset @topic = .topic @partition = .partition @latency = .latency @done = true @waiter.broadcast remove_instance_variable(:@mutex) remove_instance_variable(:@waiter) end if @callback @callback.call(self) end nil end |
#error? ⇒ Boolean
Returns if the delivery errored
67 68 69 |
# File 'lib/kafka/producer/delivery_report.rb', line 67 def error? received? && !successful? end |
#received? ⇒ Boolean
Returns true when the report has been received back from the kafka cluster.
58 59 60 |
# File 'lib/kafka/producer/delivery_report.rb', line 58 def received? @done end |
#successful? ⇒ Boolean
Returns if the delivery was successful
75 76 77 |
# File 'lib/kafka/producer/delivery_report.rb', line 75 def successful? received? && error.nil? end |
#wait(timeout: 5000) ⇒ Object
Wait for a report to be received for the delivery from the cluster.
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/kafka/producer/delivery_report.rb', line 114 def wait(timeout: 5000) # Fast out since the delivery report has already been reported back from # the cluster. if @done return end @mutex.synchronize do # Convert from milliseconds to seconds to match Ruby's API. Takes # milliseconds to be consistent with librdkafka APIs. if timeout timeout /= 1000.0 end @waiter.wait(@mutex, timeout) # No report was received for the message before we timed out waiting. if !@done raise ::Kafka::ResponseError, ::Kafka::FFI::RD_KAFKA_RESP_ERR__TIMED_OUT end end nil end |