Class: Karafka::Pro::Iterator
- Inherits:
-
Object
- Object
- Karafka::Pro::Iterator
- Defined in:
- lib/karafka/pro/iterator.rb,
lib/karafka/pro/iterator/expander.rb,
lib/karafka/pro/iterator/tpl_builder.rb
Overview
Topic iterator allows you to iterate over topic/partition data and perform lookups for information that you need.
It supports early stops on finding the requested data and allows for seeking till the end. It also allows for signaling, when a given message should be last out of certain partition, but we still want to continue iterating in other messages.
It does not create a consumer group and does not have any offset management until first consumer offset marking happens. So can be use for quick seeks as well as iterative, repetitive data fetching from rake, etc.
Defined Under Namespace
Classes: Expander, TplBuilder
Instance Method Summary collapse
-
#each ⇒ Object
Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data.
-
#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator
constructor
A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances.
-
#mark_as_consumed(message) ⇒ Object
Marks given message as consumed.
-
#mark_as_consumed!(message) ⇒ Object
Marks given message as consumed and commits offsets.
-
#stop ⇒ Object
Stops all the iterating.
-
#stop_current_partition ⇒ Object
Stops the partition we’re currently yielded into.
-
#stop_partition(name, partition) ⇒ Object
Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.
Constructor Details
#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator
It is worth keeping in mind, that this API also needs to operate within ‘max.poll.interval.ms` limitations on each iteration
In case of a never-ending iterator, you need to set ‘enable.partition.eof` to `false` so we don’t stop polling data even when reaching the end (end on a given moment)
A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances. This API allows for multi-partition streaming and is optimized for data lookups. It allows for explicit stopping iteration over any partition during the iteration process, allowing for optimized lookups.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/karafka/pro/iterator.rb', line 46 def initialize( topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200 ) @topics_with_partitions = Expander.new.call(topics) @routing_topics = @topics_with_partitions.map do |name, _| [name, ::Karafka::Routing::Router.find_or_initialize_by_name(name)] end.to_h @total_partitions = @topics_with_partitions.map(&:last).sum(&:count) @stopped_partitions = 0 @settings = settings @yield_nil = yield_nil @max_wait_time = max_wait_time end |
Instance Method Details
#each ⇒ Object
Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data. It is useful for scenarios where we are looking for some information in all the partitions but once we found it, given partition data is no longer needed and would only eat up resources.
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/karafka/pro/iterator.rb', line 72 def each Admin.with_consumer(@settings) do |consumer| tpl = TplBuilder.new(consumer, @topics_with_partitions).call consumer.assign(tpl) # We need this for self-referenced APIs like pausing @current_consumer = consumer # Stream data until we reach the end of all the partitions or until the end user # indicates that they are done until done? = poll # Skip nils if not explicitly required next if .nil? && !@yield_nil if @current_message = () yield(@current_message, self) else yield(nil, self) end end @current_consumer.commit_offsets(async: false) if @stored_offsets @current_message = nil @current_consumer = nil end # Reset so we can use the same iterator again if needed @stopped_partitions = 0 end |
#mark_as_consumed(message) ⇒ Object
Marks given message as consumed.
143 144 145 146 |
# File 'lib/karafka/pro/iterator.rb', line 143 def mark_as_consumed() @current_consumer.store_offset(, nil) @stored_offsets = true end |
#mark_as_consumed!(message) ⇒ Object
Marks given message as consumed and commits offsets
151 152 153 154 |
# File 'lib/karafka/pro/iterator.rb', line 151 def mark_as_consumed!() mark_as_consumed() @current_consumer.commit_offsets(async: false) end |
#stop ⇒ Object
‘break` can also be used but in such cases commits stored async will not be flushed to Kafka. This is why `#stop` is the recommended method.
Stops all the iterating
136 137 138 |
# File 'lib/karafka/pro/iterator.rb', line 136 def stop @stopped = true end |
#stop_current_partition ⇒ Object
Stops the partition we’re currently yielded into
107 108 109 110 111 112 |
# File 'lib/karafka/pro/iterator.rb', line 107 def stop_current_partition stop_partition( @current_message.topic, @current_message.partition ) end |
#stop_partition(name, partition) ⇒ Object
Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.
We pause it forever and no longer work with it.
123 124 125 126 127 128 129 130 131 |
# File 'lib/karafka/pro/iterator.rb', line 123 def stop_partition(name, partition) @stopped_partitions += 1 @current_consumer.pause( Rdkafka::Consumer::TopicPartitionList.new( name => [Rdkafka::Consumer::Partition.new(partition, 0)] ) ) end |