Class: Karafka::Processing::Jobs::Consume

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/processing/jobs/consume.rb

Overview

The main job type. It runs the executor that triggers given topic partition messages processing in an underlying consumer instance.

Instance Attribute Summary collapse

Attributes inherited from Base

#executor

Instance Method Summary collapse

Methods inherited from Base

#finish!, #finished?, #non_blocking?, #wrap

Constructor Details

#initialize(executor, messages) ⇒ Consume

Parameters:



18
19
20
21
22
# File 'lib/karafka/processing/jobs/consume.rb', line 18

def initialize(executor, messages)
  @executor = executor
  @messages = messages
  super()
end

Instance Attribute Details

#messagesArray<Rdkafka::Consumer::Message> (readonly)

Returns array with messages.

Returns:

  • (Array<Rdkafka::Consumer::Message>)

    array with messages



10
11
12
# File 'lib/karafka/processing/jobs/consume.rb', line 10

def messages
  @messages
end

Instance Method Details

#after_callObject

Runs any error handling and other post-consumption stuff on the executor



41
42
43
# File 'lib/karafka/processing/jobs/consume.rb', line 41

def after_call
  executor.after_consume
end

#before_callObject

Runs the before consumption preparations on the executor



31
32
33
# File 'lib/karafka/processing/jobs/consume.rb', line 31

def before_call
  executor.before_consume
end

#before_scheduleObject

Runs all the preparation code on the executor that needs to happen before the job is scheduled.



26
27
28
# File 'lib/karafka/processing/jobs/consume.rb', line 26

def before_schedule
  executor.before_schedule_consume(@messages)
end

#callObject

Runs the given executor



36
37
38
# File 'lib/karafka/processing/jobs/consume.rb', line 36

def call
  executor.consume
end