Class: ActiveJob::QueueAdapters::AsyncAdapter::Scheduler
- Inherits:
-
Object
- Object
- ActiveJob::QueueAdapters::AsyncAdapter::Scheduler
- Defined in:
- lib/active_job/queue_adapters/async_adapter.rb
Overview
:nodoc:
Constant Summary collapse
- DEFAULT_EXECUTOR_OPTIONS =
{ min_threads: 0, max_threads: ENV.fetch("RAILS_MAX_THREADS", 5).to_i, auto_terminate: true, idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :caller_runs # shouldn't matter -- 0 max queue }.freeze
Instance Attribute Summary collapse
-
#immediate ⇒ Object
Returns the value of attribute immediate.
Instance Method Summary collapse
- #enqueue(job, queue_name:) ⇒ Object
- #enqueue_at(job, timestamp, queue_name:) ⇒ Object
- #executor ⇒ Object
-
#initialize(**options) ⇒ Scheduler
constructor
A new instance of Scheduler.
- #shutdown(wait: true) ⇒ Object
Constructor Details
#initialize(**options) ⇒ Scheduler
Returns a new instance of Scheduler.
86 87 88 89 90 |
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 86 def initialize(**) self.immediate = false @immediate_executor = Concurrent::ImmediateExecutor.new @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge()) end |
Instance Attribute Details
#immediate ⇒ Object
Returns the value of attribute immediate.
84 85 86 |
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 84 def immediate @immediate end |
Instance Method Details
#enqueue(job, queue_name:) ⇒ Object
92 93 94 |
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 92 def enqueue(job, queue_name:) executor.post(job, &:perform) end |
#enqueue_at(job, timestamp, queue_name:) ⇒ Object
96 97 98 99 100 101 102 103 |
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 96 def enqueue_at(job, , queue_name:) delay = - Time.current.to_f if !immediate && delay > 0 Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform) else enqueue(job, queue_name: queue_name) end end |
#executor ⇒ Object
110 111 112 |
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 110 def executor immediate ? @immediate_executor : @async_executor end |
#shutdown(wait: true) ⇒ Object
105 106 107 108 |
# File 'lib/active_job/queue_adapters/async_adapter.rb', line 105 def shutdown(wait: true) @async_executor.shutdown @async_executor.wait_for_termination if wait end |