Class: Karafka::Processing::Jobs::Consume

Inherits:
Base
  • Object
show all
Defined in:
lib/karafka/processing/jobs/consume.rb

Overview

The main job type. It runs the executor that triggers given topic partition messages processing in an underlying consumer instance.

Instance Attribute Summary collapse

Attributes inherited from Base

#executor

Instance Method Summary collapse

Methods inherited from Base

#non_blocking?

Constructor Details

#initialize(executor, messages) ⇒ Consume

Parameters:



16
17
18
19
20
# File 'lib/karafka/processing/jobs/consume.rb', line 16

def initialize(executor, messages)
  @executor = executor
  @messages = messages
  super()
end

Instance Attribute Details

#messagesArray<Rdkafka::Consumer::Message> (readonly)

Returns array with messages.

Returns:

  • (Array<Rdkafka::Consumer::Message>)

    array with messages



10
11
12
# File 'lib/karafka/processing/jobs/consume.rb', line 10

def messages
  @messages
end

Instance Method Details

#after_callObject

Runs any error handling and other post-consumption stuff on the executor



39
40
41
# File 'lib/karafka/processing/jobs/consume.rb', line 39

def after_call
  executor.after_consume
end

#before_callObject

Runs the before consumption preparations on the executor



29
30
31
# File 'lib/karafka/processing/jobs/consume.rb', line 29

def before_call
  executor.before_consume
end

#before_enqueueObject

Runs all the preparation code on the executor that needs to happen before the job is enqueued.



24
25
26
# File 'lib/karafka/processing/jobs/consume.rb', line 24

def before_enqueue
  executor.before_enqueue(@messages)
end

#callObject

Runs the given executor



34
35
36
# File 'lib/karafka/processing/jobs/consume.rb', line 34

def call
  executor.consume
end