Class: Karafka::Processing::ExecutorsBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/executors_buffer.rb

Overview

Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.

Instance Method Summary collapse

Constructor Details

#initialize(client, subscription_group) ⇒ ExecutorsBuffer

Parameters:



11
12
13
14
15
16
# File 'lib/karafka/processing/executors_buffer.rb', line 11

def initialize(client, subscription_group)
  @subscription_group = subscription_group
  @client = client
  # We need two layers here to keep track of topics, partitions and processing groups
  @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } }
end

Instance Method Details

#clearObject

Clears the executors buffer. Useful for critical errors recovery.



67
68
69
# File 'lib/karafka/processing/executors_buffer.rb', line 67

def clear
  @buffer.clear
end

#each {|karafka, partition, given| ... } ⇒ Object

Iterates over all available executors and yields them together with topic and partition info

Yield Parameters:

  • karafka (Routing::Topic)

    routing topic object

  • partition (Integer)

    number

  • given (Executor)

    executor



56
57
58
59
60
61
62
63
64
# File 'lib/karafka/processing/executors_buffer.rb', line 56

def each
  @buffer.each do |_, partitions|
    partitions.each do |_, executors|
      executors.each do |_, executor|
        yield(executor)
      end
    end
  end
end

#find_all(topic, partition) ⇒ Array<Executor>

Finds all the executors available for a given topic partition

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

Returns:

  • (Array<Executor>)

    executors in use for this topic + partition



47
48
49
# File 'lib/karafka/processing/executors_buffer.rb', line 47

def find_all(topic, partition)
  @buffer[topic][partition].values
end

#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor

Finds or creates an executor based on the provided details

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

  • parallel_key (String)

    parallel group key

  • coordinator (Karafka::Processing::Coordinator)

Returns:



25
26
27
28
29
30
31
# File 'lib/karafka/processing/executors_buffer.rb', line 25

def find_or_create(topic, partition, parallel_key, coordinator)
  @buffer[topic][partition][parallel_key] ||= Executor.new(
    @subscription_group.id,
    @client,
    coordinator
  )
end

#revoke(topic, partition) ⇒ Object

Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number



38
39
40
# File 'lib/karafka/processing/executors_buffer.rb', line 38

def revoke(topic, partition)
  @buffer[topic][partition].clear
end