Class: ActiveJob::QueueAdapters::BackgroundThreadAdapter

Inherits:
InlineAdapter
  • Object
show all
Defined in:
lib/autoload/active_job/queue_adapters/background_thread_adapter.rb

Overview

== Active Job Background Thread adapter

When enqueueing jobs with the Background Thread adapter the job will be executed in a background thread. This is a good balance between the hassle of setting up a full backend and having some asynchrony.

To use the Inline set the queue_adapter config to +:background_thread+.

Rails.application.config.active_job.queue_adapter = :background_thread

Defined Under Namespace

Classes: LogSubscriber

Constant Summary collapse

ENQUEUE_EVENT =

The ActiveSupport::Notification instrumentation event when a new job is enqueued.

'enqueue.background_thread_adapter.active_job'
GROW_EVENT =

The ActiveSupport::Notification instrumentation event when the pool grows.

'grow.background_thread_adapter.active_job'
SHRINK_EVENT =

The ActiveSupport::Notification instrumentation event when the pool shrinks.

'shrink.background_thread_adapter.active_job'
MAX_THREAD_POOL_SIZE =

The maximum number of threads to maintain in the thread pool.

3

Instance Method Summary collapse

Constructor Details

#initializeBackgroundThreadAdapter

Returns a new instance of BackgroundThreadAdapter


24
25
26
27
28
29
30
31
# File 'lib/autoload/active_job/queue_adapters/background_thread_adapter.rb', line 24

def initialize
  @future_jobs = []
  @pending_jobs = []
  @running_jobs = 0
  @finish_jobs_condition = ConditionVariable.new
  @thread_pool = []
  @thread_pool_mutex = Mutex.new
end

Instance Method Details

#clear_enqueued_jobsObject

Clear all the enqueued jobs


53
54
55
# File 'lib/autoload/active_job/queue_adapters/background_thread_adapter.rb', line 53

def clear_enqueued_jobs
  @future_jobs.clear
end

#enqueue(job) ⇒ Object

:nodoc:


33
34
35
36
37
38
39
40
# File 'lib/autoload/active_job/queue_adapters/background_thread_adapter.rb', line 33

def enqueue(job) #:nodoc:
  ActiveSupport::Notifications.instrument(ENQUEUE_EVENT, job: job, caller: caller) do |payload|
    with_thread_pool { @pending_jobs << job.serialize }
    ensure_threads

    payload.reverse_merge!(notification_statistics)
  end
end

#enqueue_at(job, timestamp) ⇒ Object

:nodoc:


42
43
44
# File 'lib/autoload/active_job/queue_adapters/background_thread_adapter.rb', line 42

def enqueue_at(job, timestamp) #:nodoc:
  @future_jobs << { job: job, at: timestamp }
end

#perform_enqueued_jobsObject

Add all future jobs into the pending jobs queue according to timestamp


47
48
49
50
# File 'lib/autoload/active_job/queue_adapters/background_thread_adapter.rb', line 47

def perform_enqueued_jobs
  @future_jobs.sort_by { |job| -job[:at] }.each { |job| enqueue(job[:job]) }
  clear_enqueued_jobs
end

#wait_for_jobsObject

Waits for all queued jobs to finish executing.


58
59
60
61
62
63
# File 'lib/autoload/active_job/queue_adapters/background_thread_adapter.rb', line 58

def wait_for_jobs
  with_thread_pool do
    return if @pending_jobs.empty? && @running_jobs == 0
    @finish_jobs_condition.wait(@thread_pool_mutex)
  end
end