Class: GoodJob::CronManager
- Inherits:
-
Object
- Object
- GoodJob::CronManager
- Defined in:
- lib/good_job/cron_manager.rb
Overview
CronManagers enqueue jobs on a repeating schedule.
Class Attribute Summary collapse
-
.instances ⇒ Array<GoodJob::CronManager>?
readonly
List of all instantiated CronManagers in the current process.
Instance Attribute Summary collapse
-
#cron_entries ⇒ Array<CronEntry>
readonly
Execution configuration to be scheduled.
Class Method Summary collapse
-
.task_observer(time, output, thread_error) ⇒ Object
Task observer for cron task.
Instance Method Summary collapse
-
#create_graceful_tasks(cron_entry) ⇒ Object
Uses the graceful restart period to re-enqueue jobs that were scheduled to run during the period.
-
#create_task(cron_entry, at: nil, previously_at: nil) ⇒ Object
Enqueues a scheduled task.
-
#initialize(cron_entries = [], start_on_initialize: false, graceful_restart_period: nil, executor: Concurrent.global_io_executor) ⇒ CronManager
constructor
A new instance of CronManager.
-
#restart(timeout: nil) ⇒ Object
Stop and restart.
-
#running? ⇒ Boolean?
Tests whether the manager is running.
-
#shutdown(timeout: nil) ⇒ Object
Stop/cancel any scheduled tasks.
-
#shutdown? ⇒ Boolean?
Tests whether the manager is shutdown.
-
#start ⇒ Object
Schedule tasks that will enqueue jobs based on their schedule.
Constructor Details
#initialize(cron_entries = [], start_on_initialize: false, graceful_restart_period: nil, executor: Concurrent.global_io_executor) ⇒ CronManager
Returns a new instance of CronManager.
36 37 38 39 40 41 42 43 44 45 |
# File 'lib/good_job/cron_manager.rb', line 36 def initialize(cron_entries = [], start_on_initialize: false, graceful_restart_period: nil, executor: Concurrent.global_io_executor) @executor = executor @running = false @cron_entries = cron_entries @tasks = Concurrent::Hash.new @graceful_restart_period = graceful_restart_period start if start_on_initialize self.class.instances << self end |
Class Attribute Details
.instances ⇒ Array<GoodJob::CronManager>? (readonly)
List of all instantiated CronManagers in the current process.
16 |
# File 'lib/good_job/cron_manager.rb', line 16 cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false |
Instance Attribute Details
#cron_entries ⇒ Array<CronEntry> (readonly)
Execution configuration to be scheduled
30 31 32 |
# File 'lib/good_job/cron_manager.rb', line 30 def cron_entries @cron_entries end |
Class Method Details
.task_observer(time, output, thread_error) ⇒ Object
Task observer for cron task
22 23 24 25 26 |
# File 'lib/good_job/cron_manager.rb', line 22 def self.task_observer(time, output, thread_error) # rubocop:disable Lint/UnusedMethodArgument return if thread_error.is_a? Concurrent::CancelledOperationError GoodJob._on_thread_error(thread_error) if thread_error end |
Instance Method Details
#create_graceful_tasks(cron_entry) ⇒ Object
Uses the graceful restart period to re-enqueue jobs that were scheduled to run during the period. The existing uniqueness logic should ensure this does not create duplicate jobs.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/good_job/cron_manager.rb', line 118 def create_graceful_tasks(cron_entry) return unless @graceful_restart_period time_period = @graceful_restart_period.ago..Time.current cron_entry.within(time_period).each do |cron_at| future = Concurrent::Future.new(args: [self, cron_entry, cron_at], executor: @executor) do |_thr_manager, thr_cron_entry, thr_cron_at| Rails.application.executor.wrap do cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? end end future.add_observer(self.class, :task_observer) future.execute end end |
#create_task(cron_entry, at: nil, previously_at: nil) ⇒ Object
Enqueues a scheduled task
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/good_job/cron_manager.rb', line 89 def create_task(cron_entry, at: nil, previously_at: nil) cron_at = at || cron_entry.next_at(previously_at: previously_at) # ScheduledTask runs immediately if delay is <= 0.01; avoid ever scheduling the task before the intended time # https://github.com/ruby-concurrency/concurrent-ruby/blob/56227a4c3ebdd53b8b0976eb8296ceb7a093496f/lib/concurrent-ruby/concurrent/executor/timer_set.rb#L97 delay = cron_at <= Time.current ? 0.0 : [(cron_at - Time.current).to_f, 0.02].max future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at, previously_at], executor: @executor) do |thr_manager, thr_cron_entry, thr_cron_at| if thr_cron_at && thr_cron_at > Time.current # If clock drift or other inaccuracy, reschedule the task again thr_manager.create_task(thr_cron_entry, at: thr_cron_at, previously_at: previously_at) else # Re-schedule the next cron task before executing the current task thr_manager.create_task(thr_cron_entry, previously_at: thr_cron_at) Rails.application.executor.wrap do cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? end end end @tasks[cron_entry.key] = future future.add_observer(self.class, :task_observer) future.execute end |
#restart(timeout: nil) ⇒ Object
Stop and restart
68 69 70 71 |
# File 'lib/good_job/cron_manager.rb', line 68 def restart(timeout: nil) # rubocop:disable Lint/UnusedMethodArgument shutdown start end |
#running? ⇒ Boolean?
Tests whether the manager is running.
75 76 77 |
# File 'lib/good_job/cron_manager.rb', line 75 def running? @running end |
#shutdown(timeout: nil) ⇒ Object
Stop/cancel any scheduled tasks
60 61 62 63 64 |
# File 'lib/good_job/cron_manager.rb', line 60 def shutdown(timeout: nil) # rubocop:disable Lint/UnusedMethodArgument @running = false @tasks.each_value(&:cancel) @tasks.clear end |
#shutdown? ⇒ Boolean?
Tests whether the manager is shutdown.
81 82 83 |
# File 'lib/good_job/cron_manager.rb', line 81 def shutdown? !running? end |
#start ⇒ Object
Schedule tasks that will enqueue jobs based on their schedule
48 49 50 51 52 53 54 55 56 |
# File 'lib/good_job/cron_manager.rb', line 48 def start ActiveSupport::Notifications.instrument("cron_manager_start.good_job", cron_entries: cron_entries) do @running = true cron_entries.each do |cron_entry| create_task(cron_entry) create_graceful_tasks(cron_entry) if @graceful_restart_period end end end |