Class: GoodJob::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/scheduler.rb

Overview

Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.

Every scheduler has a single Performer that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask, which wakes sleeping threads and causes them to check whether the performer has new work.

Defined Under Namespace

Classes: ThreadPoolExecutor

Constant Summary collapse

DEFAULT_POOL_OPTIONS =

Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool is where work is performed.

{
  name: name,
  min_threads: 0,
  max_threads: Configuration::DEFAULT_MAX_THREADS,
  auto_terminate: true,
  idletime: 60,
  max_queue: -1,
  fallback_policy: :discard,
}.freeze

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(performer, max_threads: nil) ⇒ Scheduler

Returns a new instance of Scheduler.

Parameters:

  • performer (GoodJob::Performer)
  • max_threads (Numeric, nil) (defaults to: nil)

    number of seconds between polls for jobs

Raises:

  • (ArgumentError)


68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/good_job/scheduler.rb', line 68

def initialize(performer, max_threads: nil)
  raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)

  self.class.instances << self

  @performer = performer

  @pool_options = DEFAULT_POOL_OPTIONS.dup
  @pool_options[:max_threads] = max_threads if max_threads.present?
  @pool_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@pool_options[:max_threads]})"

  create_pool
end

Class Attribute Details

.instancesarray<GoodJob:Scheduler> (readonly)

List of all instantiated Schedulers in the current process.

Returns:

  • (array<GoodJob:Scheduler>)


33
# File 'lib/good_job/scheduler.rb', line 33

cattr_reader :instances, default: [], instance_reader: false

Class Method Details

.from_configuration(configuration) ⇒ GoodJob::Scheduler, GoodJob::MultiScheduler

Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance.

Parameters:

Returns:



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/good_job/scheduler.rb', line 38

def self.from_configuration(configuration)
  schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
    queue_string, max_threads = queue_string_and_max_threads.split(':')
    max_threads = (max_threads || configuration.max_threads).to_i

    job_query = GoodJob::Job.queue_string(queue_string)
    parsed = GoodJob::Job.queue_parser(queue_string)
    job_filter = proc do |state|
      if parsed[:exclude]
        parsed[:exclude].exclude?(state[:queue_name])
      elsif parsed[:include]
        parsed[:include].include? state[:queue_name]
      else
        true
      end
    end
    job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock, name: queue_string, filter: job_filter)

    GoodJob::Scheduler.new(job_performer, max_threads: max_threads)
  end

  if schedulers.size > 1
    GoodJob::MultiScheduler.new(schedulers)
  else
    schedulers.first
  end
end

Instance Method Details

#create_thread(state = nil) ⇒ nil, Boolean

Wakes a thread to allow the performer to execute a task.

Parameters:

  • state (nil, Object) (defaults to: nil)

    Contextual information for the performer. See Performer#next?.

Returns:

  • (nil, Boolean)

    Whether work was started. Returns nil if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity. Returns true if the performer started executing work. Returns false if the performer decides not to attempt to execute a task based on the state that is passed to it.



123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/good_job/scheduler.rb', line 123

def create_thread(state = nil)
  return nil unless @pool.running? && @pool.ready_worker_count.positive?
  return false if state && !@performer.next?(state)

  future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
    output = nil
    Rails.application.executor.wrap { output = performer.next }
    output
  end
  future.add_observer(self, :task_observer)
  future.execute

  true
end

#restart(wait: true) ⇒ void

This method returns an undefined value.

Restart the Scheduler. When shutdown, start; or shutdown and start.

Parameters:

  • wait (Boolean) (defaults to: true)

    Wait for actively executing jobs to finish



110
111
112
113
114
115
# File 'lib/good_job/scheduler.rb', line 110

def restart(wait: true)
  instrument("scheduler_restart_pools") do
    shutdown(wait: wait) unless shutdown?
    create_pool
  end
end

#shutdown(wait: true) ⇒ void

This method returns an undefined value.

Shut down the scheduler. This stops all threads in the pool. If wait is true, the scheduler will wait for any active tasks to finish. If wait is false, this method will return immediately even though threads may still be running. Use #shutdown? to determine whether threads have stopped.

Parameters:

  • wait (Boolean) (defaults to: true)

    Wait for actively executing jobs to finish



89
90
91
92
93
94
95
96
97
98
# File 'lib/good_job/scheduler.rb', line 89

def shutdown(wait: true)
  return unless @pool&.running?

  instrument("scheduler_shutdown_start", { wait: wait })
  instrument("scheduler_shutdown", { wait: wait }) do
    @pool.shutdown
    @pool.wait_for_termination if wait
    # TODO: Should be killed if wait is not true
  end
end

#shutdown?true, ...

Tests whether the scheduler is shutdown.

Returns:

  • (true, false, nil)


102
103
104
# File 'lib/good_job/scheduler.rb', line 102

def shutdown?
  !@pool&.running?
end