Class: Karafka::Processing::CoordinatorsBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/coordinators_buffer.rb

Overview

Note:

This buffer operates only from the listener loop, thus we do not have to make it thread-safe.

Coordinators builder used to build coordinators per topic partition

It provides direct pauses access for revocation

Instance Method Summary collapse

Constructor Details

#initialize(topics) ⇒ CoordinatorsBuffer

Returns a new instance of CoordinatorsBuffer.

Parameters:



13
14
15
16
17
18
# File 'lib/karafka/processing/coordinators_buffer.rb', line 13

def initialize(topics)
  @pauses_manager = Connection::PausesManager.new
  @coordinator_class = ::Karafka::App.config.internal.processing.coordinator_class
  @coordinators = Hash.new { |h, k| h[k] = {} }
  @topics = topics
end

Instance Method Details

#find_or_create(topic_name, partition) ⇒ Object

Parameters:

  • topic_name (String)

    topic name

  • partition (Integer)

    partition number



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/karafka/processing/coordinators_buffer.rb', line 22

def find_or_create(topic_name, partition)
  @coordinators[topic_name][partition] ||= begin
    routing_topic = @topics.find(topic_name)

    @coordinator_class.new(
      routing_topic,
      partition,
      @pauses_manager.fetch(routing_topic, partition)
    )
  end
end

#resetObject

Clears coordinators and re-created the pauses manager This should be used only for critical errors recovery



56
57
58
59
# File 'lib/karafka/processing/coordinators_buffer.rb', line 56

def reset
  @pauses_manager = Connection::PausesManager.new
  @coordinators.clear
end

#resume(&block) {|topic, partition| ... } ⇒ Object

Resumes processing of partitions for which pause time has ended.

Parameters:

  • block

    we want to run for resumed topic partitions

Yield Parameters:

  • topic (String)

    name

  • partition (Integer)

    number



38
39
40
# File 'lib/karafka/processing/coordinators_buffer.rb', line 38

def resume(&block)
  @pauses_manager.resume(&block)
end

#revoke(topic_name, partition) ⇒ Object

Parameters:

  • topic_name (String)

    topic name

  • partition (Integer)

    partition number



44
45
46
47
48
49
50
51
52
# File 'lib/karafka/processing/coordinators_buffer.rb', line 44

def revoke(topic_name, partition)
  return unless @coordinators[topic_name].key?(partition)

  # The fact that we delete here does not change the fact that the executor still holds the
  # reference to this coordinator. We delete it here, as we will no longer process any
  # new stuff with it and we may need a new coordinator if we regain this partition, but the
  # coordinator may still be in use
  @coordinators[topic_name].delete(partition).revoke
end