Class: ActiveJob::QueueAdapters::AsyncAdapter::Scheduler

Inherits:
Object
  • Object
show all
Defined in:
activejob/lib/active_job/queue_adapters/async_adapter.rb

Overview

:nodoc:

Constant Summary collapse

DEFAULT_EXECUTOR_OPTIONS =
{
  min_threads:     0,
  max_threads:     Concurrent.processor_count,
  auto_terminate:  true,
  idletime:        60, # 1 minute
  max_queue:       0, # unlimited
  fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
}.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ Scheduler

Returns a new instance of Scheduler.



86
87
88
89
90
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 86

def initialize(**options)
  self.immediate = false
  @immediate_executor = Concurrent::ImmediateExecutor.new
  @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
end

Instance Attribute Details

#immediateObject

Returns the value of attribute immediate



84
85
86
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 84

def immediate
  @immediate
end

Instance Method Details

#enqueue(job, queue_name:) ⇒ Object



92
93
94
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 92

def enqueue(job, queue_name:)
  executor.post(job, &:perform)
end

#enqueue_at(job, timestamp, queue_name:) ⇒ Object



96
97
98
99
100
101
102
103
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 96

def enqueue_at(job, timestamp, queue_name:)
  delay = timestamp - Time.current.to_f
  if delay > 0
    Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
  else
    enqueue(job, queue_name: queue_name)
  end
end

#executorObject



110
111
112
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 110

def executor
  immediate ? @immediate_executor : @async_executor
end

#shutdown(wait: true) ⇒ Object



105
106
107
108
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 105

def shutdown(wait: true)
  @async_executor.shutdown
  @async_executor.wait_for_termination if wait
end