Class: Kafka::FetchedOffsetResolver
- Inherits:
-
Object
- Object
- Kafka::FetchedOffsetResolver
- Defined in:
- lib/kafka/fetched_offset_resolver.rb
Instance Method Summary collapse
-
#initialize(logger:) ⇒ FetchedOffsetResolver
constructor
A new instance of FetchedOffsetResolver.
- #resolve!(broker, topics) ⇒ Object
Constructor Details
#initialize(logger:) ⇒ FetchedOffsetResolver
Returns a new instance of FetchedOffsetResolver.
5 6 7 |
# File 'lib/kafka/fetched_offset_resolver.rb', line 5 def initialize(logger:) @logger = TaggedLogger.new(logger) end |
Instance Method Details
#resolve!(broker, topics) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/kafka/fetched_offset_resolver.rb', line 9 def resolve!(broker, topics) pending_topics = filter_pending_topics(topics) return topics if pending_topics.empty? response = broker.list_offsets(topics: pending_topics) pending_topics.each do |topic, partitions| partitions.each do || partition = .fetch(:partition) resolved_offset = response.offset_for(topic, partition) @logger.debug "Offset for #{topic}/#{partition} is #{resolved_offset.inspect}" topics[topic][partition][:fetch_offset] = resolved_offset || 0 end end end |