Class: Karafka::Pro::RecurringTasks::Executor

Inherits:
Object
  • Object
show all
Defined in:
lib/karafka/pro/recurring_tasks/executor.rb

Overview

Executor is responsible for management of the state of recurring tasks schedule and is the heart of recurring tasks. It coordinates the replaying process as well as tracking of data on changes.

Constant Summary collapse

COMMANDS =

Task commands we support and that can be triggered on tasks (if matched)

%w[
  disable
  enable
  trigger
].freeze

Instance Method Summary collapse

Constructor Details

#initializeExecutor

Returns a new instance of Executor.



28
29
30
31
32
33
34
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 28

def initialize
  @replaying = true
  @incompatible = false
  @catchup_commands = []
  @catchup_schedule = nil
  @matcher = Matcher.new
end

Instance Method Details

#apply_command(command_hash) ⇒ Object

Applies given command to task (or many tasks) by running the command on tasks that match

Parameters:

  • command_hash (Hash)

    deserialized command data

Raises:



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 49

def apply_command(command_hash)
  cmd_name = command_hash[:command][:name]

  raise(Karafka::Errors::UnsupportedCaseError, cmd_name) unless COMMANDS.include?(cmd_name)

  schedule.each do |task|
    next unless @matcher.matches?(task, command_hash)

    task.public_send(cmd_name)
  end
end

#callObject

Run all tasks that should run at a given time and if any tasks were changed in any way or executed, stores the most recent state back to Kafka



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 119

def call
  changed = false

  schedule.each do |task|
    changed = true if task.changed?

    unless task.call?
      task.clear

      next
    end

    changed = true
    task.call
  end

  snapshot if changed
end

#incompatible?Boolean

Returns Is the current process schedule incompatible (older) than the one that we have in memory.

Returns:

  • (Boolean)

    Is the current process schedule incompatible (older) than the one that we have in memory



43
44
45
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 43

def incompatible?
  @incompatible
end

#replayObject

Once all previous data is accumulated runs the catchup process to establish current state of the recurring tasks schedule execution.

It includes applying any requested commands as well as synchronizing execution details for existing schedule and making sure all is loaded correctly.



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 72

def replay
  # Ensure replaying happens only once
  return unless @replaying

  @replaying = false

  # When there is nothing to replay and synchronize, we can just save the state and
  # proceed
  if @catchup_commands.empty? && @catchup_schedule.nil?
    snapshot

    return
  end

  # If the schedule version we have in Kafka is higher than ours, we cannot proceed
  # This prevents us from applying older changes to a new schedule
  if @catchup_schedule[:schedule_version] > schedule.version
    @incompatible = true

    return
  end

  # Now we can synchronize the in-memory state based on the last state stored in Kafka
  schedule.each do |task|
    stored_task = @catchup_schedule[:tasks][task.id.to_sym]

    next unless stored_task

    stored_previous_time = stored_task[:previous_time]
    task.previous_time = stored_previous_time.zero? ? 0 : Time.at(stored_previous_time)

    stored_task[:enabled] ? task.enable : task.disable
  end

  @catchup_commands.each do |cmd|
    apply_command(cmd)
  end

  # We make sure to save in Kafka once more once everything is up to date
  snapshot

  @catchup_schedule = nil
  @catchup_commands = []
end

#replaying?Boolean

Returns are we in the replaying phase or not (false means, regular operations).

Returns:

  • (Boolean)

    are we in the replaying phase or not (false means, regular operations)



37
38
39
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 37

def replaying?
  @replaying
end

#update_state(schedule_hash) ⇒ Object

Updates the catchup state

Parameters:

  • schedule_hash (Hash)

    deserialized schedule hash hash



63
64
65
# File 'lib/karafka/pro/recurring_tasks/executor.rb', line 63

def update_state(schedule_hash)
  @catchup_schedule = schedule_hash
end