Class: GoodJob::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/scheduler.rb

Overview

Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.

Every scheduler has a single JobPerformer that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask, which wakes sleeping threads and causes them to check whether the performer has new work.

Defined Under Namespace

Classes: ThreadPoolExecutor, TimerSet

Constant Summary collapse

DEFAULT_EXECUTOR_OPTIONS =

Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool executor is where work is performed.

{
  name: name,
  min_threads: 0,
  max_threads: Configuration::DEFAULT_MAX_THREADS,
  auto_terminate: true,
  idletime: 60,
  max_queue: Configuration::DEFAULT_MAX_THREADS,
  fallback_policy: :discard,
}.freeze

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil) ⇒ Scheduler

Returns a new instance of Scheduler.

Parameters:

  • performer (GoodJob::JobPerformer)
  • max_threads (Numeric, nil) (defaults to: nil)

    number of seconds between polls for jobs

  • max_cache (Numeric, nil) (defaults to: nil)

    maximum number of scheduled jobs to cache in memory

  • warm_cache_on_initialize (Boolean) (defaults to: false)

    whether to warm the cache immediately, or manually by calling warm_cache

  • cleanup_interval_seconds (Numeric, nil) (defaults to: nil)

    number of seconds between cleaning up job records

  • cleanup_interval_jobs (Numeric, nil) (defaults to: nil)

    number of executed jobs between cleaning up job records

Raises:

  • (ArgumentError)


48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/good_job/scheduler.rb', line 48

def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil)
  raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

  @performer = performer

  @max_cache = max_cache || 0
  @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup
  if max_threads.present?
    @executor_options[:max_threads] = max_threads
    @executor_options[:max_queue] = max_threads
  end
  @name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})"
  @executor_options[:name] = name

  @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
  @executor_options[:name] = name

  create_executor
  warm_cache if warm_cache_on_initialize
  self.class.instances << self
end

Class Attribute Details

.instancesArray<GoodJob::Scheduler>? (readonly)

List of all instantiated Schedulers in the current process.

Returns:



36
# File 'lib/good_job/scheduler.rb', line 36

cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

Instance Attribute Details

#nameString (readonly)

Human readable name of the scheduler that includes configuration values.

Returns:

  • (String)


40
41
42
# File 'lib/good_job/scheduler.rb', line 40

def name
  @name
end

Instance Method Details

#cleanupvoid

This method returns an undefined value.

Preload existing runnable and future-scheduled jobs



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/good_job/scheduler.rb', line 238

def cleanup
  @cleanup_tracker.reset

  future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |_thr_scheduler, thr_performer|
    Rails.application.executor.wrap do
      thr_performer.cleanup
    end
  end

  observer = lambda do |_time, _output, thread_error|
    GoodJob._on_thread_error(thread_error) if thread_error
    create_task
  end
  future.add_observer(observer, :call)
  future.execute
end

#create_thread(state = nil) ⇒ Boolean?

Wakes a thread to allow the performer to execute a task.

Parameters:

  • state (Hash, nil) (defaults to: nil)

    Contextual information for the performer. See JobPerformer#next?.

Returns:

  • (Boolean, nil)

    Whether work was started.

    • nil if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity.

    • true if the performer started executing work.

    • false if the performer decides not to attempt to execute a task based on the state that is passed to it.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/good_job/scheduler.rb', line 131

def create_thread(state = nil)
  return nil unless executor.running?

  if state.present?
    return false unless performer.next?(state)

    fanout = state&.fetch(:fanout, nil)

    if state[:count]
      # When given state for multiple jobs, try to create a thread for each one.
      # Return true if a thread can be created for all of them, nil if partial or none.

      state_without_count = state.without(:count)
      result = state[:count].times do
        value = create_thread(state_without_count)
        break(value) unless value
      end

      return result.nil? ? nil : true
    end

    if state[:scheduled_at]
      scheduled_at = if state[:scheduled_at].is_a? String
                       Time.zone.parse state[:scheduled_at]
                     else
                       state[:scheduled_at]
                     end
      delay = [(scheduled_at - Time.current).to_f, 0].max
    end
  end

  delay ||= 0
  run_now = delay <= 0.01
  if run_now
    return nil unless executor.ready_worker_count.positive?
  elsif @max_cache.positive?
    return nil unless remaining_cache_count.positive?
  end

  create_task(delay, fanout: fanout)

  run_now ? true : nil
end

#restart(timeout: -1)) ⇒ void

This method returns an undefined value.

Restart the Scheduler. When shutdown, start; or shutdown and start.

Parameters:

  • timeout (Numeric) (defaults to: -1))

    Seconds to wait for actively executing jobs to finish; shares same values as #shutdown.

Raises:

  • (ArgumentError)


114
115
116
117
118
119
120
121
122
123
# File 'lib/good_job/scheduler.rb', line 114

def restart(timeout: -1)
  raise ArgumentError, "Scheduler#restart cannot be called with a timeout of nil" if timeout.nil?

  instrument("scheduler_restart_pools") do
    shutdown(timeout: timeout)
    @performer.reset_stats
    create_executor
    warm_cache
  end
end

#running?Boolean?

Tests whether the scheduler is running.

Returns:

  • (Boolean, nil)


72
# File 'lib/good_job/scheduler.rb', line 72

delegate :running?, to: :executor, allow_nil: true

#shutdown(timeout: -1)) ⇒ void

This method returns an undefined value.

Shut down the scheduler. This stops all threads in the thread pool. Use #shutdown? to determine whether threads have stopped.

Parameters:

  • timeout (Numeric, nil) (defaults to: -1))

    Seconds to wait for actively executing jobs to finish

    • nil, the scheduler will trigger a shutdown but not wait for it to complete.

    • -1, the scheduler will wait until the shutdown is complete.

    • 0, the scheduler will immediately shutdown and stop any active tasks.

    • A positive number will wait that many seconds before stopping any remaining active tasks.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/good_job/scheduler.rb', line 89

def shutdown(timeout: -1)
  return if executor.nil? || (executor.shutdown? && !executor.shuttingdown?)

  instrument("scheduler_shutdown_start", { timeout: timeout })
  instrument("scheduler_shutdown", { timeout: timeout }) do
    if executor.running?
      @timer_set.shutdown
      executor.shutdown
    end

    if executor.shuttingdown? && timeout
      executor_wait = timeout.negative? ? nil : timeout
      return if executor.wait_for_termination(executor_wait)

      instrument("scheduler_shutdown_kill", { active_job_ids: @performer.performing_active_job_ids.to_a })
      executor.kill
      executor.wait_for_termination
    end
  end
end

#shutdown?Boolean?

Tests whether the scheduler is shutdown and no tasks are running.

Returns:

  • (Boolean, nil)


76
77
78
# File 'lib/good_job/scheduler.rb', line 76

def shutdown?
  @executor.nil? || (executor.shutdown? && !executor.shuttingdown?)
end

#statsHash

Information about the Scheduler

Returns:

  • (Hash)


197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/good_job/scheduler.rb', line 197

def stats
  available_threads = executor.ready_worker_count

  {
    name: name,
    queues: performer.name,
    max_threads: @executor_options[:max_threads],
    active_threads: @executor_options[:max_threads] - available_threads,
    available_threads: available_threads,
    max_cache: @max_cache,
    active_cache: cache_count,
    available_cache: remaining_cache_count,
  }.merge!(@performer.stats.without(:name))
end

#warm_cachevoid

This method returns an undefined value.

Preload existing runnable and future-scheduled jobs



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/good_job/scheduler.rb', line 214

def warm_cache
  return if @max_cache.zero?

  future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |thr_scheduler, thr_performer|
    Rails.application.executor.wrap do
      thr_performer.next_at(
        limit: @max_cache,
        now_limit: @executor_options[:max_threads]
      ).each do |scheduled_at|
        thr_scheduler.create_thread({ scheduled_at: scheduled_at })
      end
    end
  end

  observer = lambda do |_time, _output, thread_error|
    GoodJob._on_thread_error(thread_error) if thread_error
    create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining
  end
  future.add_observer(observer, :call)
  future.execute
end