Class: GoodJob::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/scheduler.rb

Overview

Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.

Every scheduler has a single JobPerformer that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask, which wakes sleeping threads and causes them to check whether the performer has new work.

Defined Under Namespace

Classes: ThreadPoolExecutor, TimerSet

Constant Summary collapse

DEFAULT_EXECUTOR_OPTIONS =

Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool executor is where work is performed.

{
  name: name,
  min_threads: 0,
  max_threads: Configuration::DEFAULT_MAX_THREADS,
  auto_terminate: true,
  idletime: 60,
  max_queue: Configuration::DEFAULT_MAX_THREADS,
  fallback_policy: :discard,
}.freeze
LOW_THREAD_PRIORITY =

In CRuby, this sets the thread quantum to ~12.5ms ( 100ms * 2^(-3) ).

-3

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil, lower_thread_priority: false) ⇒ Scheduler

Returns a new instance of Scheduler.

Parameters:

  • performer (GoodJob::JobPerformer)
  • max_threads (Numeric, nil) (defaults to: nil)

    number of seconds between polls for jobs

  • max_cache (Numeric, nil) (defaults to: nil)

    maximum number of scheduled jobs to cache in memory

  • warm_cache_on_initialize (Boolean) (defaults to: false)

    whether to warm the cache immediately, or manually by calling warm_cache

  • cleanup_interval_seconds (Numeric, nil) (defaults to: nil)

    number of seconds between cleaning up job records

  • cleanup_interval_jobs (Numeric, nil) (defaults to: nil)

    number of executed jobs between cleaning up job records

  • lower_thread_priority (Boolean) (defaults to: false)

    whether to lower the thread priority of execution threads

Raises:

  • (ArgumentError)


56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/good_job/scheduler.rb', line 56

def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false, cleanup_interval_seconds: nil, cleanup_interval_jobs: nil, lower_thread_priority: false)
  raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

  @performer = performer

  @max_cache = max_cache || 0
  @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup
  if max_threads.present?
    @executor_options[:max_threads] = max_threads
    @executor_options[:max_queue] = max_threads
  end
  @name = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})"
  @executor_options[:name] = name

  @cleanup_tracker = CleanupTracker.new(cleanup_interval_seconds: cleanup_interval_seconds, cleanup_interval_jobs: cleanup_interval_jobs)
  @executor_options[:name] = name

  self.lower_thread_priority = lower_thread_priority

  create_executor
  warm_cache if warm_cache_on_initialize
  self.class.instances << self
end

Class Attribute Details

.instancesArray<GoodJob::Scheduler>? (readonly)

List of all instantiated Schedulers in the current process.

Returns:



39
# File 'lib/good_job/scheduler.rb', line 39

cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

Instance Attribute Details

#lower_thread_priorityBoolean

Whether to lower the thread priority to a fixed value

Returns:

  • (Boolean)


47
48
49
# File 'lib/good_job/scheduler.rb', line 47

def lower_thread_priority
  @lower_thread_priority
end

#nameString (readonly)

Human readable name of the scheduler that includes configuration values.

Returns:

  • (String)


43
44
45
# File 'lib/good_job/scheduler.rb', line 43

def name
  @name
end

Instance Method Details

#cleanupvoid

This method returns an undefined value.

Preload existing runnable and future-scheduled jobs



248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/good_job/scheduler.rb', line 248

def cleanup
  @cleanup_tracker.reset

  future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |_thr_scheduler, thr_performer|
    Rails.application.executor.wrap do
      thr_performer.cleanup
    end
  end

  observer = lambda do |_time, _output, thread_error|
    GoodJob._on_thread_error(thread_error) if thread_error
    create_task
  end
  future.add_observer(observer, :call)
  future.execute
end

#create_thread(state = nil) ⇒ Boolean?

Wakes a thread to allow the performer to execute a task.

Parameters:

  • state (Hash, nil) (defaults to: nil)

    Contextual information for the performer. See JobPerformer#next?.

Returns:

  • (Boolean, nil)

    Whether work was started.

    • nil if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity.

    • true if the performer started executing work.

    • false if the performer decides not to attempt to execute a task based on the state that is passed to it.



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/good_job/scheduler.rb', line 141

def create_thread(state = nil)
  return nil unless executor.running?

  if state.present?
    return false unless performer.next?(state)

    fanout = state&.fetch(:fanout, nil)

    if state[:count]
      # When given state for multiple jobs, try to create a thread for each one.
      # Return true if a thread can be created for all of them, nil if partial or none.

      state_without_count = state.without(:count)
      result = state[:count].times do
        value = create_thread(state_without_count)
        break(value) unless value
      end

      return result.nil? ? nil : true
    end

    if state[:scheduled_at]
      scheduled_at = if state[:scheduled_at].is_a? String
                       Time.zone.parse state[:scheduled_at]
                     else
                       state[:scheduled_at]
                     end
      delay = [(scheduled_at - Time.current).to_f, 0].max
    end
  end

  delay ||= 0
  run_now = delay <= 0.01
  if run_now
    return nil unless executor.ready_worker_count.positive?
  elsif @max_cache.positive?
    return nil unless remaining_cache_count.positive?
  end

  create_task(delay, fanout: fanout)

  run_now ? true : nil
end

#restart(timeout: -1)) ⇒ void

This method returns an undefined value.

Restart the Scheduler. When shutdown, start; or shutdown and start.

Parameters:

  • timeout (Numeric) (defaults to: -1))

    Seconds to wait for actively executing jobs to finish; shares same values as #shutdown.

Raises:

  • (ArgumentError)


124
125
126
127
128
129
130
131
132
133
# File 'lib/good_job/scheduler.rb', line 124

def restart(timeout: -1)
  raise ArgumentError, "Scheduler#restart cannot be called with a timeout of nil" if timeout.nil?

  instrument("scheduler_restart_pools") do
    shutdown(timeout: timeout)
    @performer.reset_stats
    create_executor
    warm_cache
  end
end

#running?Boolean?

Tests whether the scheduler is running.

Returns:

  • (Boolean, nil)


82
# File 'lib/good_job/scheduler.rb', line 82

delegate :running?, to: :executor, allow_nil: true

#shutdown(timeout: -1)) ⇒ void

This method returns an undefined value.

Shut down the scheduler. This stops all threads in the thread pool. Use #shutdown? to determine whether threads have stopped.

Parameters:

  • timeout (Numeric, nil) (defaults to: -1))

    Seconds to wait for actively executing jobs to finish

    • nil, the scheduler will trigger a shutdown but not wait for it to complete.

    • -1, the scheduler will wait until the shutdown is complete.

    • 0, the scheduler will immediately shutdown and stop any active tasks.

    • A positive number will wait that many seconds before stopping any remaining active tasks.



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/good_job/scheduler.rb', line 99

def shutdown(timeout: -1)
  return if executor.nil? || (executor.shutdown? && !executor.shuttingdown?)

  instrument("scheduler_shutdown_start", { timeout: timeout })
  instrument("scheduler_shutdown", { timeout: timeout }) do
    if executor.running?
      @timer_set.shutdown
      executor.shutdown
    end

    if executor.shuttingdown? && timeout
      executor_wait = timeout.negative? ? nil : timeout
      return if executor.wait_for_termination(executor_wait)

      instrument("scheduler_shutdown_kill", { active_job_ids: @performer.performing_active_job_ids.to_a })
      executor.kill
      executor.wait_for_termination
    end
  end
end

#shutdown?Boolean?

Tests whether the scheduler is shutdown and no tasks are running.

Returns:

  • (Boolean, nil)


86
87
88
# File 'lib/good_job/scheduler.rb', line 86

def shutdown?
  @executor.nil? || (executor.shutdown? && !executor.shuttingdown?)
end

#statsHash

Information about the Scheduler

Returns:

  • (Hash)


207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/good_job/scheduler.rb', line 207

def stats
  available_threads = executor.ready_worker_count

  {
    name: name,
    queues: performer.name,
    max_threads: @executor_options[:max_threads],
    active_threads: @executor_options[:max_threads] - available_threads,
    available_threads: available_threads,
    max_cache: @max_cache,
    active_cache: cache_count,
    available_cache: remaining_cache_count,
  }.merge!(@performer.stats.without(:name))
end

#warm_cachevoid

This method returns an undefined value.

Preload existing runnable and future-scheduled jobs



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
# File 'lib/good_job/scheduler.rb', line 224

def warm_cache
  return if @max_cache.zero?

  future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |thr_scheduler, thr_performer|
    Rails.application.executor.wrap do
      thr_performer.next_at(
        limit: @max_cache,
        now_limit: @executor_options[:max_threads]
      ).each do |scheduled_at|
        thr_scheduler.create_thread({ scheduled_at: scheduled_at })
      end
    end
  end

  observer = lambda do |_time, _output, thread_error|
    GoodJob._on_thread_error(thread_error) if thread_error
    create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining
  end
  future.add_observer(observer, :call)
  future.execute
end