Class: ActiveJob::QueueAdapters::AsyncAdapter
- Inherits:
-
AbstractAdapter
- Object
- AbstractAdapter
- ActiveJob::QueueAdapters::AsyncAdapter
- Defined in:
- activejob/lib/active_job/queue_adapters/async_adapter.rb
Overview
Active Job Async adapter
The Async adapter runs jobs with an in-process thread pool.
This is the default queue adapter. It’s well-suited for dev/test since it doesn’t need an external infrastructure, but it’s a poor fit for production since it drops pending jobs on restart.
To use this adapter, set queue adapter to :async
:
config.active_job.queue_adapter = :async
To configure the adapter’s thread pool, instantiate the adapter and pass your own config:
config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
min_threads: 1,
max_threads: 2 * Concurrent.processor_count,
idletime: 600.seconds
The adapter uses a Concurrent Ruby thread pool to schedule and execute jobs. Since jobs share a single thread pool, long-running jobs will block short-lived jobs. Fine for dev/test; bad for production.
Defined Under Namespace
Classes: JobWrapper, Scheduler
Instance Method Summary collapse
-
#enqueue(job) ⇒ Object
:nodoc:.
-
#enqueue_at(job, timestamp) ⇒ Object
:nodoc:.
-
#immediate=(immediate) ⇒ Object
Used for our test suite.
-
#initialize(**executor_options) ⇒ AsyncAdapter
constructor
See Concurrent::ThreadPoolExecutor for executor options.
-
#shutdown(wait: true) ⇒ Object
Gracefully stop processing jobs.
Constructor Details
#initialize(**executor_options) ⇒ AsyncAdapter
See Concurrent::ThreadPoolExecutor for executor options.
35 36 37 |
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 35 def initialize(**) @scheduler = Scheduler.new(**) end |
Instance Method Details
#enqueue(job) ⇒ Object
:nodoc:
39 40 41 |
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 39 def enqueue(job) # :nodoc: @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name end |
#enqueue_at(job, timestamp) ⇒ Object
:nodoc:
43 44 45 |
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 43 def enqueue_at(job, ) # :nodoc: @scheduler.enqueue_at JobWrapper.new(job), , queue_name: job.queue_name end |
#immediate=(immediate) ⇒ Object
Used for our test suite.
55 56 57 |
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 55 def immediate=(immediate) # :nodoc: @scheduler.immediate = immediate end |
#shutdown(wait: true) ⇒ Object
Gracefully stop processing jobs. Finishes in-progress work and handles any new jobs following the executor’s fallback policy (‘caller_runs`). Waits for termination by default. Pass `wait: false` to continue.
50 51 52 |
# File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 50 def shutdown(wait: true) # :nodoc: @scheduler.shutdown wait: wait end |