Class: Karafka::Processing::Executor
- Inherits:
-
Object
- Object
- Karafka::Processing::Executor
- Defined in:
- lib/karafka/processing/executor.rb
Overview
Executors are not removed after partition is revoked. They are not that big and will be re-used in case of a re-claim
Executors:
-
run consumers code (for ‘#call`) or run given preparation / teardown operations when needed from separate threads.
-
they re-create consumer instances in case of partitions that were revoked and assigned back.
Instance Attribute Summary collapse
-
#group_id ⇒ String
readonly
Subscription group id to which a given executor belongs.
-
#id ⇒ String
readonly
Unique id that we use to ensure, that we use for state tracking.
-
#messages ⇒ Karafka::Messages::Messages
readonly
Messages batch.
-
#topic ⇒ Karafka::Routing::Topic
readonly
Topic accessibility may be needed for the jobs builder to be able to build a proper job based on the topic settings defined by the end user.
Instance Method Summary collapse
-
#after_consume ⇒ Object
Runs consumer after consumption code.
-
#before_consume ⇒ Object
Runs setup and warm-up code in the worker prior to running the consumption.
-
#before_enqueue(messages, coordinator) ⇒ Object
Allows us to prepare the consumer in the listener thread prior to the job being send to the queue.
-
#consume ⇒ Object
Runs consumer data processing against given batch and handles failures and errors.
-
#initialize(group_id, client, topic) ⇒ Executor
constructor
A new instance of Executor.
-
#revoked ⇒ Object
Runs the controller ‘#revoked` method that should be triggered when a given consumer is no longer needed due to partitions reassignment.
-
#shutdown ⇒ Object
Runs the controller ‘#shutdown` method that should be triggered when a given consumer is no longer needed as we’re closing the process.
Constructor Details
#initialize(group_id, client, topic) ⇒ Executor
Returns a new instance of Executor.
33 34 35 36 37 38 |
# File 'lib/karafka/processing/executor.rb', line 33 def initialize(group_id, client, topic) @id = SecureRandom.hex(6) @group_id = group_id @client = client @topic = topic end |
Instance Attribute Details
#group_id ⇒ String (readonly)
Returns subscription group id to which a given executor belongs.
19 20 21 |
# File 'lib/karafka/processing/executor.rb', line 19 def group_id @group_id end |
#id ⇒ String (readonly)
Returns unique id that we use to ensure, that we use for state tracking.
16 17 18 |
# File 'lib/karafka/processing/executor.rb', line 16 def id @id end |
#messages ⇒ Karafka::Messages::Messages (readonly)
Returns messages batch.
22 23 24 |
# File 'lib/karafka/processing/executor.rb', line 22 def @messages end |
#topic ⇒ Karafka::Routing::Topic (readonly)
Topic accessibility may be needed for the jobs builder to be able to build a proper job based on the topic settings defined by the end user
28 29 30 |
# File 'lib/karafka/processing/executor.rb', line 28 def topic @topic end |
Instance Method Details
#after_consume ⇒ Object
Runs consumer after consumption code
81 82 83 |
# File 'lib/karafka/processing/executor.rb', line 81 def after_consume consumer.on_after_consume end |
#before_consume ⇒ Object
Runs setup and warm-up code in the worker prior to running the consumption
70 71 72 |
# File 'lib/karafka/processing/executor.rb', line 70 def before_consume consumer.on_before_consume end |
#before_enqueue(messages, coordinator) ⇒ Object
Allows us to prepare the consumer in the listener thread prior to the job being send to the queue. It also allows to run some code that is time sensitive and cannot wait in the queue as it could cause starvation.
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/karafka/processing/executor.rb', line 46 def before_enqueue(, coordinator) # the moment we've received the batch or actually the moment we've enqueued it, # but good enough @enqueued_at = Time.now # Recreate consumer with each batch if persistence is not enabled # We reload the consumers with each batch instead of relying on some external signals # when needed for consistency. That way devs may have it on or off and not in this # middle state, where re-creation of a consumer instance would occur only sometimes @consumer = nil unless ::Karafka::App.config.consumer_persistence consumer.coordinator = coordinator # First we build messages batch... consumer. = Messages::Builders::Messages.call( , @topic, @enqueued_at ) consumer.on_before_enqueue end |
#consume ⇒ Object
Runs consumer data processing against given batch and handles failures and errors.
75 76 77 78 |
# File 'lib/karafka/processing/executor.rb', line 75 def consume # We run the consumer client logic... consumer.on_consume end |
#revoked ⇒ Object
Clearing the consumer will ensure, that if we get the partition back, it will be handled with a consumer with a clean state.
We run it only when consumer was present, because presence indicates, that at least a single message has been consumed.
We do not reset the consumer but we indicate need for recreation instead, because after the revocation, there still may be ‘#after_consume` running that needs a given consumer instance.
Runs the controller ‘#revoked` method that should be triggered when a given consumer is no longer needed due to partitions reassignment.
97 98 99 |
# File 'lib/karafka/processing/executor.rb', line 97 def revoked consumer.on_revoked if @consumer end |
#shutdown ⇒ Object
While we do not need to clear the consumer here, it’s a good habit to clean after work is done.
Runs the controller ‘#shutdown` method that should be triggered when a given consumer is no longer needed as we’re closing the process.
106 107 108 109 110 |
# File 'lib/karafka/processing/executor.rb', line 106 def shutdown # There is a case, where the consumer no longer exists because it was revoked, in case like # that we do not build a new instance and shutdown should not be triggered. consumer.on_shutdown if @consumer end |