Class: Karafka::Processing::ExecutorsBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/processing/executors_buffer.rb

Overview

Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.

Instance Method Summary collapse

Constructor Details

#initialize(client, subscription_group) ⇒ ExecutorsBuffer

Parameters:



11
12
13
14
15
16
# File 'lib/karafka/processing/executors_buffer.rb', line 11

def initialize(client, subscription_group)
  @subscription_group = subscription_group
  @client = client
  # We need two layers here to keep track of topics, partitions and processing groups
  @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } }
end

Instance Method Details

#clearObject

Clears the executors buffer. Useful for critical errors recovery.



73
74
75
# File 'lib/karafka/processing/executors_buffer.rb', line 73

def clear
  @buffer.clear
end

#each {|karafka, partition, given| ... } ⇒ Object

Iterates over all available executors and yields them together with topic and partition info

Yield Parameters:

  • karafka (Routing::Topic)

    routing topic object

  • partition (Integer)

    number

  • given (Executor)

    executor



61
62
63
64
65
66
67
68
69
70
# File 'lib/karafka/processing/executors_buffer.rb', line 61

def each
  @buffer.each do |ktopic, partitions|
    partitions.each do |partition, executors|
      executors.each do |_parallel_key, executor|
        # We skip the parallel key here as it does not serve any value when iterating
        yield(ktopic, partition, executor)
      end
    end
  end
end

#find_all(topic, partition) ⇒ Array<Executor>

Finds all the executors available for a given topic partition

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

Returns:

  • (Array<Executor>)

    executors in use for this topic + partition



50
51
52
53
54
# File 'lib/karafka/processing/executors_buffer.rb', line 50

def find_all(topic, partition)
  ktopic = find_topic(topic)

  @buffer[ktopic][partition].values
end

#find_or_create(topic, partition, parallel_key) ⇒ Executor

Finds or creates an executor based on the provided details

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number

  • parallel_key (String)

    parallel group key

Returns:



24
25
26
27
28
29
30
31
32
# File 'lib/karafka/processing/executors_buffer.rb', line 24

def find_or_create(topic, partition, parallel_key)
  ktopic = find_topic(topic)

  @buffer[ktopic][partition][parallel_key] ||= Executor.new(
    @subscription_group.id,
    @client,
    ktopic
  )
end

#revoke(topic, partition) ⇒ Object

Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages

Parameters:

  • topic (String)

    topic name

  • partition (Integer)

    partition number



39
40
41
42
43
# File 'lib/karafka/processing/executors_buffer.rb', line 39

def revoke(topic, partition)
  ktopic = find_topic(topic)

  @buffer[ktopic][partition].clear
end