Class: Karafka::Pro::RecurringTasks::Consumer
- Inherits:
-
BaseConsumer
- Object
- BaseConsumer
- Karafka::Pro::RecurringTasks::Consumer
- Defined in:
- lib/karafka/pro/recurring_tasks/consumer.rb
Overview
Consumer responsible for management of the recurring tasks and their execution There are some assumptions made here that always need to be satisfied:
- we only run schedules that are of same or newer version
- we always mark as consumed in such a way, that the first received after
assignment (if any) is a state
Instance Attribute Summary
Attributes inherited from BaseConsumer
#client, #coordinator, #id, #messages, #producer
Instance Method Summary collapse
-
#consume ⇒ Object
Consumes messages and manages recurring tasks execution.
-
#eofed ⇒ Object
Starts the final replay process if we reached eof during replaying.
-
#initialize(*args) ⇒ Consumer
constructor
A new instance of Consumer.
-
#tick ⇒ Object
Runs the cron execution if all good.
Methods inherited from BaseConsumer
#inspect, #on_after_consume, #on_before_consume, #on_before_schedule_consume, #on_before_schedule_eofed, #on_before_schedule_idle, #on_before_schedule_revoked, #on_before_schedule_shutdown, #on_consume, #on_eofed, #on_idle, #on_initialized, #on_revoked, #on_shutdown, #on_wrap
Constructor Details
#initialize(*args) ⇒ Consumer
Returns a new instance of Consumer.
33 34 35 36 |
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 33 def initialize(*args) super @executor = Executor.new end |
Instance Method Details
#consume ⇒ Object
Consumes messages and manages recurring tasks execution
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 39 def consume # There is nothing we can do if we operate on a newer schedule. In such cases we should # just wait and re-raise error hoping someone will notice or that this will be # reassigned to a process with newer schedule raise(Errors::IncompatibleScheduleError) if @executor.incompatible? .each do || payload = .payload type = payload[:type] case type when "schedule" # If we're replaying data, we need to record the most recent stored state, so we # can use this data to fully initialize the scheduler @executor.update_state(payload) if @executor. # If this is first message we cannot mark it on the previous offset next if .offset.zero? # We always mark as consumed in such a way, that when replaying, we start from a # schedule state message. This makes it easier to recover. mark_as_consumed Karafka::Messages::Seek.new( topic.name, partition, .offset - 1 ) when "command" @executor.apply_command(payload) next if @executor. # Execute on each incoming command to have nice latency but only after replaying # During replaying we should not execute because there may be more state changes # that collectively have a different outcome @executor.call else raise Karafka::Errors::UnsupportedCaseError, type end end eofed if eofed? end |
#eofed ⇒ Object
Starts the final replay process if we reached eof during replaying
83 84 85 86 87 88 89 |
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 83 def eofed # We only mark as replayed if we were replaying in the first place # If already replayed, nothing to do return unless @executor. @executor.replay end |
#tick ⇒ Object
Runs the cron execution if all good.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/karafka/pro/recurring_tasks/consumer.rb', line 92 def tick # Do nothing until we fully recover the correct state return if @executor. # If the state is incompatible, we can only raise an error. # We do it here and in the `#consume` so the pause-retry kicks in basically reporting # on this issue once every minute. That way user should not miss this. # We use seek to move so we can achieve a pause of 60 seconds in between consecutive # errors instead of on each tick because it is much more frequent. if @executor.incompatible? if .empty? raise Errors::IncompatibleScheduleError else return seek(.last.offset - 1) end end # If all good and compatible we can execute the recurring tasks @executor.call end |