Class: Karafka::Pro::Iterator

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/iterator.rb,
lib/karafka/pro/iterator/expander.rb,
lib/karafka/pro/iterator/tpl_builder.rb

Overview

Topic iterator allows you to iterate over topic/partition data and perform lookups for information that you need.

It supports early stops on finding the requested data and allows for seeking till the end. It also allows for signaling, when a given message should be last out of certain partition, but we still want to continue iterating in other messages.

It does not create a consumer group and does not have any offset management until first consumer offset marking happens. So can be use for quick seeks as well as iterative, repetitive data fetching from rake, etc.

Defined Under Namespace

Classes: Expander, TplBuilder

Instance Method Summary collapse

Constructor Details

#initialize(topics, settings: { 'auto.offset.reset': 'beginning' }, yield_nil: false, max_wait_time: 200) ⇒ Iterator

Note:

It is worth keeping in mind, that this API also needs to operate within ‘max.poll.interval.ms` limitations on each iteration

Note:

In case of a never-ending iterator, you need to set ‘enable.partition.eof` to `false` so we don’t stop polling data even when reaching the end (end on a given moment)

A simple API allowing to iterate over topic/partition data, without having to subscribe and deal with rebalances. This API allows for multi-partition streaming and is optimized for data lookups. It allows for explicit stopping iteration over any partition during the iteration process, allowing for optimized lookups.

Parameters:

  • topics (Array<String>, Hash)

    list of strings if we want to subscribe to multiple topics and all of their partitions or a hash where keys are the topics and values are hashes with partitions and their initial offsets.

  • settings (Hash) (defaults to: { 'auto.offset.reset': 'beginning' })

    extra settings for the consumer. Please keep in mind, that if overwritten, you may want to include ‘auto.offset.reset` to match your case.

  • yield_nil (Boolean) (defaults to: false)

    should we yield also ‘nil` values when poll returns nothing. Useful in particular for long-living iterators.

  • max_wait_time (Integer) (defaults to: 200)

    max wait in ms when iterator did not receive any messages



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/karafka/pro/iterator.rb', line 46

def initialize(
  topics,
  settings: { 'auto.offset.reset': 'beginning' },
  yield_nil: false,
  max_wait_time: 200
)
  @topics_with_partitions = Expander.new.call(topics)

  @routing_topics = @topics_with_partitions.map do |name, _|
    [name, ::Karafka::Routing::Router.find_or_initialize_by_name(name)]
  end.to_h

  @total_partitions = @topics_with_partitions.map(&:last).sum(&:count)

  @stopped_partitions = 0

  @settings = settings
  @yield_nil = yield_nil
  @max_wait_time = max_wait_time
end

Instance Method Details

#eachObject

Iterates over requested topic partitions and yields the results with the iterator itself Iterator instance is yielded because one can run ‘stop_partition` to stop iterating over part of data. It is useful for scenarios where we are looking for some information in all the partitions but once we found it, given partition data is no longer needed and would only eat up resources.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/karafka/pro/iterator.rb', line 72

def each
  Admin.with_consumer(@settings) do |consumer|
    tpl = TplBuilder.new(consumer, @topics_with_partitions).call
    consumer.assign(tpl)

    # We need this for self-referenced APIs like pausing
    @current_consumer = consumer

    # Stream data until we reach the end of all the partitions or until the end user
    # indicates that they are done
    until done?
      message = poll

      # Skip nils if not explicitly required
      next if message.nil? && !@yield_nil

      if message
        @current_message = build_message(message)

        yield(@current_message, self)
      else
        yield(nil, self)
      end
    end

    @current_consumer.commit_offsets(async: false) if @stored_offsets
    @current_message = nil
    @current_consumer = nil
  end

  # Reset so we can use the same iterator again if needed
  @stopped_partitions = 0
end

#mark_as_consumed(message) ⇒ Object

Marks given message as consumed.

Parameters:



143
144
145
146
# File 'lib/karafka/pro/iterator.rb', line 143

def mark_as_consumed(message)
  @current_consumer.store_offset(message, nil)
  @stored_offsets = true
end

#mark_as_consumed!(message) ⇒ Object

Marks given message as consumed and commits offsets

Parameters:



151
152
153
154
# File 'lib/karafka/pro/iterator.rb', line 151

def mark_as_consumed!(message)
  mark_as_consumed(message)
  @current_consumer.commit_offsets(async: false)
end

#stopObject

Note:

‘break` can also be used but in such cases commits stored async will not be flushed to Kafka. This is why `#stop` is the recommended method.

Stops all the iterating



136
137
138
# File 'lib/karafka/pro/iterator.rb', line 136

def stop
  @stopped = true
end

#stop_current_partitionObject

Stops the partition we’re currently yielded into



107
108
109
110
111
112
# File 'lib/karafka/pro/iterator.rb', line 107

def stop_current_partition
  stop_partition(
    @current_message.topic,
    @current_message.partition
  )
end

#stop_partition(name, partition) ⇒ Object

Stops processing of a given partition We expect the partition to be provided because of a scenario, where there is a multi-partition iteration and we want to stop a different partition that the one that is currently yielded.

We pause it forever and no longer work with it.

Parameters:

  • name (String)

    topic name of which partition we want to stop

  • partition (Integer)

    partition we want to stop processing



123
124
125
126
127
128
129
130
131
# File 'lib/karafka/pro/iterator.rb', line 123

def stop_partition(name, partition)
  @stopped_partitions += 1

  @current_consumer.pause(
    Rdkafka::Consumer::TopicPartitionList.new(
      name => [Rdkafka::Consumer::Partition.new(partition, 0)]
    )
  )
end