Class: Karafka::Pro::ActiveJob::Consumer

Inherits:
BaseConsumer show all
Defined in:
lib/karafka/pro/active_job/consumer.rb

Overview

Pro ActiveJob consumer that is suppose to handle long-running jobs as well as short running jobs

When in LRJ, it will pause a given partition forever and will resume its processing only when all the jobs are done processing.

It contains slightly better revocation warranties than the regular blocking consumer as it can stop processing batch of jobs in the middle after the revocation.

Instance Attribute Summary

Attributes inherited from BaseConsumer

#client, #coordinator, #id, #messages, #producer, #topic

Instance Method Summary collapse

Methods inherited from BaseConsumer

#initialize, #on_after_consume, #on_before_consume, #on_before_enqueue, #on_consume, #on_revoked, #on_shutdown

Constructor Details

This class inherits a constructor from Karafka::BaseConsumer

Instance Method Details

#consumeObject

Runs ActiveJob jobs processing and handles lrj if needed



27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/karafka/pro/active_job/consumer.rb', line 27

def consume
  messages.each do |message|
    # If for any reason we've lost this partition, not worth iterating over new messages
    # as they are no longer ours
    break if revoked?
    break if Karafka::App.stopping?

    ::ActiveJob::Base.execute(
      ::ActiveSupport::JSON.decode(message.raw_payload)
    )

    # We cannot mark jobs as done after each if there are virtual partitions. Otherwise
    # this could create random markings
    next if topic.virtual_partitions?

    mark_as_consumed(message)
  end
end