Class: GoodJob::CronManager

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/cron_manager.rb

Overview

CronManagers enqueue jobs on a repeating schedule.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cron_entries = [], start_on_initialize: false, graceful_restart_period: nil, executor: Concurrent.global_io_executor) ⇒ CronManager

Returns a new instance of CronManager.

Parameters:

  • cron_entries (Array<CronEntry>) (defaults to: [])
  • start_on_initialize (Boolean) (defaults to: false)
  • graceful_restart_period (ActiveSupport::Duration, nil) (defaults to: nil)
  • executor (Concurrent::Executor) (defaults to: Concurrent.global_io_executor)


36
37
38
39
40
41
42
43
44
45
# File 'lib/good_job/cron_manager.rb', line 36

def initialize(cron_entries = [], start_on_initialize: false, graceful_restart_period: nil, executor: Concurrent.global_io_executor)
  @executor = executor
  @running = false
  @cron_entries = cron_entries
  @tasks = Concurrent::Hash.new
  @graceful_restart_period = graceful_restart_period

  start if start_on_initialize
  self.class.instances << self
end

Class Attribute Details

.instancesArray<GoodJob::CronManager>? (readonly)

List of all instantiated CronManagers in the current process.

Returns:



16
# File 'lib/good_job/cron_manager.rb', line 16

cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

Instance Attribute Details

#cron_entriesArray<CronEntry> (readonly)

Execution configuration to be scheduled

Returns:



30
31
32
# File 'lib/good_job/cron_manager.rb', line 30

def cron_entries
  @cron_entries
end

Class Method Details

.task_observer(time, output, thread_error) ⇒ Object

Task observer for cron task

Parameters:

  • time (Time)
  • output (Object)
  • thread_error (Exception)


22
23
24
25
26
# File 'lib/good_job/cron_manager.rb', line 22

def self.task_observer(time, output, thread_error) # rubocop:disable Lint/UnusedMethodArgument
  return if thread_error.is_a? Concurrent::CancelledOperationError

  GoodJob._on_thread_error(thread_error) if thread_error
end

Instance Method Details

#create_graceful_tasks(cron_entry) ⇒ Object

Uses the graceful restart period to re-enqueue jobs that were scheduled to run during the period. The existing uniqueness logic should ensure this does not create duplicate jobs.

Parameters:

  • cron_entry (CronEntry)

    the CronEntry object to schedule



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/good_job/cron_manager.rb', line 118

def create_graceful_tasks(cron_entry)
  return unless @graceful_restart_period

  time_period = @graceful_restart_period.ago..Time.current
  cron_entry.within(time_period).each do |cron_at|
    future = Concurrent::Future.new(args: [self, cron_entry, cron_at], executor: @executor) do |_thr_manager, thr_cron_entry, thr_cron_at|
      Rails.application.executor.wrap do
        cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled?
      end
    end

    future.add_observer(self.class, :task_observer)
    future.execute
  end
end

#create_task(cron_entry, at: nil, previously_at: nil) ⇒ Object

Enqueues a scheduled task

Parameters:

  • cron_entry (CronEntry)

    the CronEntry object to schedule

  • at (Time, nil) (defaults to: nil)

    When a task needs to optionally be rescheduled because of clock-drift or other inaccuracy

  • previously_at (Time, nil) (defaults to: nil)

    the last in-memory scheduled time the cron task was intended to run



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/good_job/cron_manager.rb', line 89

def create_task(cron_entry, at: nil, previously_at: nil)
  cron_at = at || cron_entry.next_at(previously_at: previously_at)

  # ScheduledTask runs immediately if delay is <= 0.01; avoid ever scheduling the task before the intended time
  # https://github.com/ruby-concurrency/concurrent-ruby/blob/56227a4c3ebdd53b8b0976eb8296ceb7a093496f/lib/concurrent-ruby/concurrent/executor/timer_set.rb#L97
  delay = cron_at <= Time.current ? 0.0 : [(cron_at - Time.current).to_f, 0.02].max

  future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at, previously_at], executor: @executor) do |thr_manager, thr_cron_entry, thr_cron_at|
    if thr_cron_at && thr_cron_at > Time.current
      # If clock drift or other inaccuracy, reschedule the task again
      thr_manager.create_task(thr_cron_entry, at: thr_cron_at, previously_at: previously_at)
    else
      # Re-schedule the next cron task before executing the current task
      thr_manager.create_task(thr_cron_entry, previously_at: thr_cron_at)

      Rails.application.executor.wrap do
        cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled?
      end
    end
  end

  @tasks[cron_entry.key] = future
  future.add_observer(self.class, :task_observer)
  future.execute
end

#restart(timeout: nil) ⇒ Object

Stop and restart

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    Unused but retained for compatibility



68
69
70
71
# File 'lib/good_job/cron_manager.rb', line 68

def restart(timeout: nil) # rubocop:disable Lint/UnusedMethodArgument
  shutdown
  start
end

#running?Boolean?

Tests whether the manager is running.

Returns:

  • (Boolean, nil)


75
76
77
# File 'lib/good_job/cron_manager.rb', line 75

def running?
  @running
end

#shutdown(timeout: nil) ⇒ Object

Stop/cancel any scheduled tasks

Parameters:

  • timeout (Numeric, nil) (defaults to: nil)

    Unused but retained for compatibility



60
61
62
63
64
# File 'lib/good_job/cron_manager.rb', line 60

def shutdown(timeout: nil) # rubocop:disable Lint/UnusedMethodArgument
  @running = false
  @tasks.each_value(&:cancel)
  @tasks.clear
end

#shutdown?Boolean?

Tests whether the manager is shutdown.

Returns:

  • (Boolean, nil)


81
82
83
# File 'lib/good_job/cron_manager.rb', line 81

def shutdown?
  !running?
end

#startObject

Schedule tasks that will enqueue jobs based on their schedule



48
49
50
51
52
53
54
55
56
# File 'lib/good_job/cron_manager.rb', line 48

def start
  ActiveSupport::Notifications.instrument("cron_manager_start.good_job", cron_entries: cron_entries) do
    @running = true
    cron_entries.each do |cron_entry|
      create_task(cron_entry)
      create_graceful_tasks(cron_entry) if @graceful_restart_period
    end
  end
end