Class: Karafka::Processing::ExecutorsBuffer
- Inherits:
-
Object
- Object
- Karafka::Processing::ExecutorsBuffer
- Defined in:
- lib/karafka/processing/executors_buffer.rb
Overview
Buffer for executors of a given subscription group. It wraps around the concept of building and caching them, so we can re-use them instead of creating new each time.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the executors buffer.
-
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info.
-
#find_all(topic, partition) ⇒ Array<Executor>
Finds all the executors available for a given topic partition.
-
#find_or_create(topic, partition, parallel_key) ⇒ Executor
Finds or creates an executor based on the provided details.
- #initialize(client, subscription_group) ⇒ ExecutorsBuffer constructor
-
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages.
Constructor Details
#initialize(client, subscription_group) ⇒ ExecutorsBuffer
11 12 13 14 15 16 |
# File 'lib/karafka/processing/executors_buffer.rb', line 11 def initialize(client, subscription_group) @subscription_group = subscription_group @client = client # We need two layers here to keep track of topics, partitions and processing groups @buffer = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } } end |
Instance Method Details
#clear ⇒ Object
Clears the executors buffer. Useful for critical errors recovery.
73 74 75 |
# File 'lib/karafka/processing/executors_buffer.rb', line 73 def clear @buffer.clear end |
#each {|karafka, partition, given| ... } ⇒ Object
Iterates over all available executors and yields them together with topic and partition info
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/karafka/processing/executors_buffer.rb', line 61 def each @buffer.each do |ktopic, partitions| partitions.each do |partition, executors| executors.each do |_parallel_key, executor| # We skip the parallel key here as it does not serve any value when iterating yield(ktopic, partition, executor) end end end end |
#find_all(topic, partition) ⇒ Array<Executor>
Finds all the executors available for a given topic partition
50 51 52 53 54 |
# File 'lib/karafka/processing/executors_buffer.rb', line 50 def find_all(topic, partition) ktopic = find_topic(topic) @buffer[ktopic][partition].values end |
#find_or_create(topic, partition, parallel_key) ⇒ Executor
Finds or creates an executor based on the provided details
24 25 26 27 28 29 30 31 32 |
# File 'lib/karafka/processing/executors_buffer.rb', line 24 def find_or_create(topic, partition, parallel_key) ktopic = find_topic(topic) @buffer[ktopic][partition][parallel_key] ||= Executor.new( @subscription_group.id, @client, ktopic ) end |
#revoke(topic, partition) ⇒ Object
Revokes executors of a given topic partition, so they won’t be used anymore for incoming messages
39 40 41 42 43 |
# File 'lib/karafka/processing/executors_buffer.rb', line 39 def revoke(topic, partition) ktopic = find_topic(topic) @buffer[ktopic][partition].clear end |