Class: Kafka::Producer::DeliveryReport

Inherits:
Object
  • Object
show all
Defined in:
lib/kafka/producer/delivery_report.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&block) ⇒ DeliveryReport

Initializes a new DeliveryReport

Parameters:

  • block (Proc)

    Callback to call with the DeliveryReport when it is received from the cluster.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/kafka/producer/delivery_report.rb', line 37

def initialize(&block)
  @mutex = Mutex.new
  @waiter = ConditionVariable.new

  @error = nil
  @topic = nil
  @offset = nil
  @latency = nil
  @partition = nil
  @callback = block

  # Will be set to true by a call to #done. Fast out for any callers to
  # #wait that may come in after done has already been called.
  @done = false
end

Instance Attribute Details

#errornil, Kafka::ResponseError (readonly)

Returns:

  • (nil)

    Delivery was successful or report has not been received yet.

  • (Kafka::ResponseError)

    Error delivering the Message.



8
9
10
# File 'lib/kafka/producer/delivery_report.rb', line 8

def error
  @error
end

#latencynil, Integer (readonly)

Note:

Latency is in microseconds (μs) while most other timestamps are in milliseconds.

Returns the number of microseconds since the message was enqueued for delivery until the message was confirmed by the cluster or permanently failed.

Returns:

  • (nil)

    Latency was not available

  • (Integer)

    Time since message was produced in microseconds



31
32
33
# File 'lib/kafka/producer/delivery_report.rb', line 31

def latency
  @latency
end

#offsetnil, Integer (readonly)

Returns:

  • (nil)

    Report has not been received yet

  • (Integer)

    Offset for the message on partition.



16
17
18
# File 'lib/kafka/producer/delivery_report.rb', line 16

def offset
  @offset
end

#partitionnil, Integer (readonly)

Returns:

  • (nil)

    Report has not been received yet

  • (Integer)

    Partition the message was delivered to.



20
21
22
# File 'lib/kafka/producer/delivery_report.rb', line 20

def partition
  @partition
end

#topicnil, String (readonly)

Returns:

  • (nil)

    Report has not been received yet

  • (String)

    Name of the topic Message was delivered to.



12
13
14
# File 'lib/kafka/producer/delivery_report.rb', line 12

def topic
  @topic
end

Instance Method Details

#done(message) ⇒ Object

Set the response based on the message and notify anyone waiting on the result.

Parameters:



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/kafka/producer/delivery_report.rb', line 85

def done(message)
  @mutex.synchronize do
    @error = message.error

    @offset = message.offset
    @topic = message.topic
    @partition = message.partition
    @latency = message.latency

    @done = true
    @waiter.broadcast

    remove_instance_variable(:@mutex)
    remove_instance_variable(:@waiter)
  end

  if @callback
    @callback.call(self)
  end

  nil
end

#error?Boolean

Returns if the delivery errored

Returns:

  • (Boolean)

    True when the delivery failed with an error.

See Also:

  • #see


67
68
69
# File 'lib/kafka/producer/delivery_report.rb', line 67

def error?
  received? && !successful?
end

#received?Boolean

Returns true when the report has been received back from the kafka cluster.

Returns:

  • (Boolean)

    True when the server has reported back on the delivery.



58
59
60
# File 'lib/kafka/producer/delivery_report.rb', line 58

def received?
  @done
end

#successful?Boolean

Returns if the delivery was successful

Returns:

  • (Boolean)

    True when the report was delivered to the cluster successfully.



75
76
77
# File 'lib/kafka/producer/delivery_report.rb', line 75

def successful?
  received? && error.nil?
end

#wait(timeout: 5000) ⇒ Object

Wait for a report to be received for the delivery from the cluster.

Parameters:

  • timeout (Integer) (defaults to: 5000)

    Maximum time to wait in milliseconds.

Raises:



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/kafka/producer/delivery_report.rb', line 114

def wait(timeout: 5000)
  # Fast out since the delivery report has already been reported back from
  # the cluster.
  if @done
    return
  end

  @mutex.synchronize do
    # Convert from milliseconds to seconds to match Ruby's API. Takes
    # milliseconds to be consistent with librdkafka APIs.
    if timeout
      timeout /= 1000.0
    end

    @waiter.wait(@mutex, timeout)

    # No report was received for the message before we timed out waiting.
    if !@done
      raise ::Kafka::ResponseError, ::Kafka::FFI::RD_KAFKA_RESP_ERR__TIMED_OUT
    end
  end

  nil
end