Class: Karafka::Processing::ExecutorsBuffer
- Inherits:
-
Object
- Object
- Karafka::Processing::ExecutorsBuffer
- Defined in:
- lib/karafka/processing/executors_buffer.rb
Overview
Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the executors buffer.
-
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info.
-
#find_all(topic, partition) ⇒ Array<Executor>
Finds all the executors available for a given topic partition.
-
#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor
Finds or creates an executor based on the provided details.
- #initialize(client, subscription_group) ⇒ ExecutorsBuffer constructor
-
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages.
Constructor Details
#initialize(client, subscription_group) ⇒ ExecutorsBuffer
11 12 13 14 15 16 |
# File 'lib/karafka/processing/executors_buffer.rb', line 11 def initialize(client, subscription_group) @subscription_group = subscription_group @client = client # We need two layers here to keep track of topics, partitions and processing groups @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } } end |
Instance Method Details
#clear ⇒ Object
Clears the executors buffer. Useful for critical errors recovery.
67 68 69 |
# File 'lib/karafka/processing/executors_buffer.rb', line 67 def clear @buffer.clear end |
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info
56 57 58 59 60 61 62 63 64 |
# File 'lib/karafka/processing/executors_buffer.rb', line 56 def each @buffer.each do |_, partitions| partitions.each do |_, executors| executors.each do |_, executor| yield(executor) end end end end |
#find_all(topic, partition) ⇒ Array<Executor>
Finds all the executors available for a given topic partition
47 48 49 |
# File 'lib/karafka/processing/executors_buffer.rb', line 47 def find_all(topic, partition) @buffer[topic][partition].values end |
#find_or_create(topic, partition, parallel_key, coordinator) ⇒ Executor
Finds or creates an executor based on the provided details
25 26 27 28 29 30 31 |
# File 'lib/karafka/processing/executors_buffer.rb', line 25 def find_or_create(topic, partition, parallel_key, coordinator) @buffer[topic][partition][parallel_key] ||= Executor.new( @subscription_group.id, @client, coordinator ) end |
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages
38 39 40 |
# File 'lib/karafka/processing/executors_buffer.rb', line 38 def revoke(topic, partition) @buffer[topic][partition].clear end |