Class: Karafka::Pro::ActiveJob::Consumer
- Inherits:
-
BaseConsumer
- Object
- BaseConsumer
- BaseConsumer
- Karafka::Pro::ActiveJob::Consumer
- Defined in:
- lib/karafka/pro/active_job/consumer.rb
Overview
Pro ActiveJob consumer that is suppose to handle long-running jobs as well as short running jobs
When in LRJ, it will pause a given partition forever and will resume its processing only when all the jobs are done processing.
It contains slightly better revocation warranties than the regular blocking consumer as it can stop processing batch of jobs in the middle after the revocation.
Instance Attribute Summary
Attributes inherited from BaseConsumer
#client, #coordinator, #id, #messages, #producer, #topic
Instance Method Summary collapse
-
#consume ⇒ Object
Runs ActiveJob jobs processing and handles lrj if needed.
Methods inherited from BaseConsumer
#initialize, #on_after_consume, #on_before_consume, #on_before_enqueue, #on_consume, #on_revoked, #on_shutdown
Constructor Details
This class inherits a constructor from Karafka::BaseConsumer
Instance Method Details
#consume ⇒ Object
Runs ActiveJob jobs processing and handles lrj if needed
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/karafka/pro/active_job/consumer.rb', line 27 def consume .each do || # If for any reason we've lost this partition, not worth iterating over new messages # as they are no longer ours break if revoked? break if Karafka::App.stopping? ::ActiveJob::Base.execute( ::ActiveSupport::JSON.decode(.raw_payload) ) # We cannot mark jobs as done after each if there are virtual partitions. Otherwise # this could create random markings next if topic.virtual_partitions? mark_as_consumed() end end |