Class: GoodJob::Scheduler
- Inherits:
-
Object
- Object
- GoodJob::Scheduler
- Defined in:
- lib/good_job/scheduler.rb
Overview
Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.
Every scheduler has a single Performer that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor
. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask
, which wakes sleeping threads and causes them to check whether the performer has new work.
Defined Under Namespace
Classes: ThreadPoolExecutor
Constant Summary collapse
- DEFAULT_POOL_OPTIONS =
Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool is where work is performed.
{ name: name, min_threads: 0, max_threads: Configuration::DEFAULT_MAX_THREADS, auto_terminate: true, idletime: 60, max_queue: -1, fallback_policy: :discard, }.freeze
Class Attribute Summary collapse
-
.instances ⇒ array<GoodJob:Scheduler>
readonly
List of all instantiated Schedulers in the current process.
Class Method Summary collapse
-
.from_configuration(configuration) ⇒ GoodJob::Scheduler, GoodJob::MultiScheduler
Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
Instance Method Summary collapse
-
#create_thread(state = nil) ⇒ nil, Boolean
Wakes a thread to allow the performer to execute a task.
-
#initialize(performer, max_threads: nil) ⇒ Scheduler
constructor
A new instance of Scheduler.
-
#restart(wait: true) ⇒ void
Restart the Scheduler.
-
#shutdown(wait: true) ⇒ void
Shut down the scheduler.
-
#shutdown? ⇒ true, ...
Tests whether the scheduler is shutdown.
Constructor Details
#initialize(performer, max_threads: nil) ⇒ Scheduler
Returns a new instance of Scheduler.
68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/good_job/scheduler.rb', line 68 def initialize(performer, max_threads: nil) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) self.class.instances << self @performer = performer @pool_options = DEFAULT_POOL_OPTIONS.dup @pool_options[:max_threads] = max_threads if max_threads.present? @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})" create_pool end |
Class Attribute Details
.instances ⇒ array<GoodJob:Scheduler> (readonly)
List of all instantiated Schedulers in the current process.
33 |
# File 'lib/good_job/scheduler.rb', line 33 cattr_reader :instances, default: [], instance_reader: false |
Class Method Details
.from_configuration(configuration) ⇒ GoodJob::Scheduler, GoodJob::MultiScheduler
Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/good_job/scheduler.rb', line 38 def self.from_configuration(configuration) schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| queue_string, max_threads = queue_string_and_max_threads.split(':') max_threads = (max_threads || configuration.max_threads).to_i job_query = GoodJob::Job.queue_string(queue_string) parsed = GoodJob::Job.queue_parser(queue_string) job_filter = proc do |state| if parsed[:exclude] parsed[:exclude].exclude?(state[:queue_name]) elsif parsed[:include] parsed[:include].include? state[:queue_name] else true end end job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter) GoodJob::Scheduler.new(job_performer, max_threads: max_threads) end if schedulers.size > 1 GoodJob::MultiScheduler.new(schedulers) else schedulers.first end end |
Instance Method Details
#create_thread(state = nil) ⇒ nil, Boolean
Wakes a thread to allow the performer to execute a task.
123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/good_job/scheduler.rb', line 123 def create_thread(state = nil) return nil unless @pool.running? && @pool.ready_worker_count.positive? return false if state && !@performer.next?(state) future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| output = nil Rails.application.executor.wrap { output = performer.next } output end future.add_observer(self, :task_observer) future.execute true end |
#restart(wait: true) ⇒ void
This method returns an undefined value.
Restart the Scheduler. When shutdown, start; or shutdown and start.
110 111 112 113 114 115 |
# File 'lib/good_job/scheduler.rb', line 110 def restart(wait: true) instrument("scheduler_restart_pools") do shutdown(wait: wait) unless shutdown? create_pool end end |
#shutdown(wait: true) ⇒ void
This method returns an undefined value.
Shut down the scheduler. This stops all threads in the pool. If wait
is true
, the scheduler will wait for any active tasks to finish. If wait
is false
, this method will return immediately even though threads may still be running. Use #shutdown? to determine whether threads have stopped.
89 90 91 92 93 94 95 96 97 98 |
# File 'lib/good_job/scheduler.rb', line 89 def shutdown(wait: true) return unless @pool&.running? instrument("scheduler_shutdown_start", { wait: wait }) instrument("scheduler_shutdown", { wait: wait }) do @pool.shutdown @pool.wait_for_termination if wait # TODO: Should be killed if wait is not true end end |
#shutdown? ⇒ true, ...
Tests whether the scheduler is shutdown.
102 103 104 |
# File 'lib/good_job/scheduler.rb', line 102 def shutdown? !@pool&.running? end |