Class: Karafka::Processing::CoordinatorsBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/coordinators_buffer.rb

Overview

Note:

This buffer operates only from the listener loop, thus we do not have to make it thread-safe.

Coordinators builder used to build coordinators per topic partition

It provides direct pauses access for revocation

Instance Method Summary collapse

Constructor Details

#initializeCoordinatorsBuffer

Returns a new instance of CoordinatorsBuffer.



12
13
14
15
16
# File 'lib/karafka/processing/coordinators_buffer.rb', line 12

def initialize
  @pauses_manager = Connection::PausesManager.new
  @coordinator_class = ::Karafka::App.config.internal.processing.coordinator_class
  @coordinators = Hash.new { |h, k| h[k] = {} }
end

Instance Method Details

#find_or_create(topic, partition) ⇒ Object

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number



20
21
22
23
24
# File 'lib/karafka/processing/coordinators_buffer.rb', line 20

def find_or_create(topic, partition)
  @coordinators[topic][partition] ||= @coordinator_class.new(
    @pauses_manager.fetch(topic, partition)
  )
end

#resetObject

Clears coordinators and re-created the pauses manager This should be used only for critical errors recovery



48
49
50
51
# File 'lib/karafka/processing/coordinators_buffer.rb', line 48

def reset
  @pauses_manager = Connection::PausesManager.new
  @coordinators.clear
end

#resume(&block) {|topic, partition| ... } ⇒ Object

Resumes processing of partitions for which pause time has ended.

Parameters:

  • block

    we want to run for resumed topic partitions

Yield Parameters:

  • topic (String)

    name

  • partition (Integer)

    number



30
31
32
# File 'lib/karafka/processing/coordinators_buffer.rb', line 30

def resume(&block)
  @pauses_manager.resume(&block)
end

#revoke(topic, partition) ⇒ Object

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number



36
37
38
39
40
41
42
43
44
# File 'lib/karafka/processing/coordinators_buffer.rb', line 36

def revoke(topic, partition)
  return unless @coordinators[topic].key?(partition)

  # The fact that we delete here does not change the fact that the executor still holds the
  # reference to this coordinator. We delete it here, as we will no longer process any
  # new stuff with it and we may need a new coordinator if we regain this partition, but the
  # coordinator may still be in use
  @coordinators[topic].delete(partition).revoke
end