Class: Karafka::Pro::RecurringTasks::Consumer
- Inherits:
-
BaseConsumer
- Object
- BaseConsumer
- Karafka::Pro::RecurringTasks::Consumer
- Defined in:
- lib/karafka/pro/recurring_tasks/consumer.rb
Overview
Consumer responsible for management of the recurring tasks and their execution There are some assumptions made here that always need to be satisfied:
- we only run schedules that are of same or newer version
- we always mark as consumed in such a way, that the first message received after
assignment (if any) is a state
Instance Attribute Summary
Attributes inherited from BaseConsumer
#client, #coordinator, #id, #messages, #producer
Instance Method Summary collapse
- #consume ⇒ Object
-
#eofed ⇒ Object
Starts the final replay process if we reached eof during replaying.
-
#initialize(*args) ⇒ Consumer
constructor
A new instance of Consumer.
-
#tick ⇒ Object
Runs the cron execution if all good.
Methods inherited from BaseConsumer
#on_after_consume, #on_before_consume, #on_before_schedule_consume, #on_before_schedule_eofed, #on_before_schedule_idle, #on_before_schedule_revoked, #on_before_schedule_shutdown, #on_consume, #on_eofed, #on_idle, #on_initialized, #on_revoked, #on_shutdown
Constructor Details
Instance Method Details
#consume ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 29 def consume # There is nothing we can do if we operate on a newer schedule. In such cases we should # just wait and re-raise error hoping someone will notice or that this will be # reassigned to a process with newer schedule raise(Errors::IncompatibleScheduleError) if @executor.incompatible? .each do || payload = .payload type = payload[:type] case type when 'schedule' # If we're replaying data, we need to record the most recent stored state, so we # can use this data to fully initialize the scheduler @executor.update_state(payload) if @executor. # If this is first message we cannot mark it on the previous offset next if .offset.zero? # We always mark as consumed in such a way, that when replaying, we start from a # schedule state message. This makes it easier to recover. mark_as_consumed Karafka::Messages::Seek.new( topic.name, partition, .offset - 1 ) when 'command' @executor.apply_command(payload) next if @executor. # Execute on each incoming command to have nice latency but only after replaying # During replaying we should not execute because there may be more state changes # that collectively have a different outcome @executor.call else raise ::Karafka::Errors::UnsupportedCaseError, type end end eofed if eofed? end |
#eofed ⇒ Object
Starts the final replay process if we reached eof during replaying
73 74 75 76 77 78 79 |
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 73 def eofed # We only mark as replayed if we were replaying in the first place # If already replayed, nothing to do return unless @executor. @executor.replay end |
#tick ⇒ Object
Runs the cron execution if all good.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 82 def tick # Do nothing until we fully recover the correct state return if @executor. # If the state is incompatible, we can only raise an error. # We do it here and in the `#consume` so the pause-retry kicks in basically reporting # on this issue once every minute. That way user should not miss this. # We use seek to move so we can achieve a pause of 60 seconds in between consecutive # errors instead of on each tick because it is much more frequent. if @executor.incompatible? if .empty? raise Errors::IncompatibleScheduleError else return seek(.last.offset - 1) end end # If all good and compatible we can execute the recurring tasks @executor.call end |