Class: Karafka::Processing::Jobs::Consume
- Defined in:
- lib/karafka/processing/jobs/consume.rb
Overview
The main job type. It runs the executor that triggers given topic partition messages processing in an underlying consumer instance.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#messages ⇒ Array<Rdkafka::Consumer::Message>
readonly
Array with messages.
Attributes inherited from Base
Instance Method Summary collapse
-
#after_call ⇒ Object
Runs any error handling and other post-consumption stuff on the executor.
-
#before_call ⇒ Object
Runs the before consumption preparations on the executor.
-
#before_schedule ⇒ Object
Runs all the preparation code on the executor that needs to happen before the job is scheduled.
-
#call ⇒ Object
Runs the given executor.
- #initialize(executor, messages) ⇒ Consume constructor
Methods inherited from Base
#finish!, #finished?, #non_blocking?, #wrap
Constructor Details
#initialize(executor, messages) ⇒ Consume
18 19 20 21 22 |
# File 'lib/karafka/processing/jobs/consume.rb', line 18 def initialize(executor, ) @executor = executor @messages = super() end |
Instance Attribute Details
#messages ⇒ Array<Rdkafka::Consumer::Message> (readonly)
Returns array with messages.
10 11 12 |
# File 'lib/karafka/processing/jobs/consume.rb', line 10 def @messages end |
Instance Method Details
#after_call ⇒ Object
Runs any error handling and other post-consumption stuff on the executor
41 42 43 |
# File 'lib/karafka/processing/jobs/consume.rb', line 41 def after_call executor.after_consume end |
#before_call ⇒ Object
Runs the before consumption preparations on the executor
31 32 33 |
# File 'lib/karafka/processing/jobs/consume.rb', line 31 def before_call executor.before_consume end |
#before_schedule ⇒ Object
Runs all the preparation code on the executor that needs to happen before the job is scheduled.
26 27 28 |
# File 'lib/karafka/processing/jobs/consume.rb', line 26 def before_schedule executor.before_schedule_consume(@messages) end |
#call ⇒ Object
Runs the given executor
36 37 38 |
# File 'lib/karafka/processing/jobs/consume.rb', line 36 def call executor.consume end |