Class: Karafka::ActiveJob::Consumer

Inherits:
BaseConsumer show all
Defined in:
lib/karafka/active_job/consumer.rb

Overview

This is the consumer for ActiveJob that eats the messages enqueued with it one after another. It marks the offset after each message, so we make sure, none of the jobs is executed twice

Instance Attribute Summary

Attributes inherited from BaseConsumer

#client, #coordinator, #id, #messages, #producer, #topic

Instance Method Summary collapse

Methods inherited from BaseConsumer

#initialize, #on_after_consume, #on_before_consume, #on_before_enqueue, #on_consume, #on_revoked, #on_shutdown

Constructor Details

This class inherits a constructor from Karafka::BaseConsumer

Instance Method Details

#consumeObject

Note:

ActiveJob does not support batches, so we just run one message after another

Executes the ActiveJob logic



11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/karafka/active_job/consumer.rb', line 11

def consume
  messages.each do |message|
    break if Karafka::App.stopping?

    ::ActiveJob::Base.execute(
      # We technically speaking could set this as deserializer and reference it from the
      # message instead of using the `#raw_payload`. This is not done on purpose to simplify
      # the ActiveJob setup here
      ::ActiveSupport::JSON.decode(message.raw_payload)
    )

    mark_as_consumed(message)
  end
end