Class: Kafka::Protocol::ListOffsetResponse
- Inherits:
-
Object
- Object
- Kafka::Protocol::ListOffsetResponse
- Defined in:
- lib/kafka/protocol/list_offset_response.rb
Overview
A response to a list offset request.
API Specification
OffsetResponse => [TopicName [PartitionOffsets]]
ThrottleTimeMS => int32
PartitionOffsets => Partition ErrorCode Timestamp Offset
Partition => int32
ErrorCode => int16
Timestamp => int64
Offset => int64
Defined Under Namespace
Classes: PartitionOffsetInfo, TopicOffsetInfo
Instance Attribute Summary collapse
-
#topics ⇒ Object
readonly
Returns the value of attribute topics.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(topics:) ⇒ ListOffsetResponse
constructor
A new instance of ListOffsetResponse.
- #offset_for(topic, partition) ⇒ Object
Constructor Details
#initialize(topics:) ⇒ ListOffsetResponse
Returns a new instance of ListOffsetResponse.
41 42 43 |
# File 'lib/kafka/protocol/list_offset_response.rb', line 41 def initialize(topics:) @topics = topics end |
Instance Attribute Details
#topics ⇒ Object (readonly)
Returns the value of attribute topics.
39 40 41 |
# File 'lib/kafka/protocol/list_offset_response.rb', line 39 def topics @topics end |
Class Method Details
.decode(decoder) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/kafka/protocol/list_offset_response.rb', line 65 def self.decode(decoder) _throttle_time_ms = decoder.int32 topics = decoder.array do name = decoder.string partition_offsets = decoder.array do PartitionOffsetInfo.new( partition: decoder.int32, error_code: decoder.int16, timestamp: decoder.int64, offset: decoder.int64 ) end TopicOffsetInfo.new( name: name, partition_offsets: partition_offsets ) end new(topics: topics) end |
Instance Method Details
#offset_for(topic, partition) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/kafka/protocol/list_offset_response.rb', line 45 def offset_for(topic, partition) topic_info = @topics.find {|t| t.name == topic } if topic_info.nil? raise UnknownTopicOrPartition, "Unknown topic #{topic}" end partition_info = topic_info .partition_offsets .find {|p| p.partition == partition } if partition_info.nil? raise UnknownTopicOrPartition, "Unknown partition #{topic}/#{partition}" end Protocol.handle_error(partition_info.error_code) partition_info.offset end |