Class: Karafka::Pro::Iterator
- Inherits:
-
Object
- Object
- Karafka::Pro::Iterator
- Defined in:
- lib/karafka/pro/iterator.rb,
lib/karafka/pro/iterator/expander.rb,
lib/karafka/pro/iterator/tpl_builder.rb
Overview
Topic iterator allows you to iterate over topic/partition data and perform lookups for information that you need.
It supports early stops on finding the requested data and allows for seeking till the end. It also allows for signaling, when a given message should be last out of certain partition, but we still want to continue iterating in other messages.
It does not create a consumer group and does not have any offset management.
Defined Under Namespace
Classes: Expander, TplBuilder
Instance Method Summary collapse
-
#each ⇒ Object
Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data.
-
#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator
constructor
A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances.
-
#stop_current_partition ⇒ Object
Stops the partition we’re currently yielded into.
-
#stop_partition(name, partition) ⇒ Object
Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.
Constructor Details
#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator
It is worth keeping in mind, that this API also needs to operate within ‘max.poll.interval.ms` limitations on each iteration
In case of a never-ending iterator, you need to set ‘enable.partition.eof` to `false` so we don’t stop polling data even when reaching the end (end on a given moment)
A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances. This API allows for multi-partition streaming and is optimized for data lookups. It allows for explicit stopping iteration over any partition during the iteration process, allowing for optimized lookups.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/karafka/pro/iterator.rb', line 49 def initialize( topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200 ) @topics_with_partitions = Expander.new.call(topics) @routing_topics = @topics_with_partitions.map do |name, _| [name, ::Karafka::Routing::Router.find_or_initialize_by_name(name)] end.to_h @total_partitions = @topics_with_partitions.map(&:last).sum(&:count) @stopped_partitions = 0 @settings = settings @yield_nil = yield_nil @max_wait_time = max_wait_time end |
Instance Method Details
#each ⇒ Object
Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data. It is useful for scenarios where we are looking for some information in all the partitions but once we found it, given partition data is no longer needed and would only eat up resources.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/karafka/pro/iterator.rb', line 75 def each Admin.with_consumer(@settings) do |consumer| tpl = TplBuilder.new(consumer, @topics_with_partitions).call consumer.assign(tpl) # We need this for self-referenced APIs like pausing @current_consumer = consumer # Stream data until we reach the end of all the partitions or until the end user # indicates that they are done until done? = poll # Skip nils if not explicitly required next if .nil? && !@yield_nil if @current_message = () yield(@current_message, self) else yield(nil, self) end end @current_message = nil @current_consumer = nil end # Reset so we can use the same iterator again if needed @stopped_partitions = 0 end |
#stop_current_partition ⇒ Object
Stops the partition we’re currently yielded into
109 110 111 112 113 114 |
# File 'lib/karafka/pro/iterator.rb', line 109 def stop_current_partition stop_partition( @current_message.topic, @current_message.partition ) end |
#stop_partition(name, partition) ⇒ Object
Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.
We pause it forever and no longer work with it.
125 126 127 128 129 130 131 132 133 |
# File 'lib/karafka/pro/iterator.rb', line 125 def stop_partition(name, partition) @stopped_partitions += 1 @current_consumer.pause( Rdkafka::Consumer::TopicPartitionList.new( name => [Partition.new(partition, 0)] ) ) end |