Class: ActiveJob::QueueAdapters::SidekiqAdapter

Inherits:
AbstractAdapter show all
Defined in:
activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb

Overview

Sidekiq adapter for Active Job

Simple, efficient background processing for Ruby. Sidekiq uses threads to handle many jobs at the same time in the same process. It does not require Rails but will integrate tightly with it to make background processing dead simple.

Read more about Sidekiq here.

To use Sidekiq set the queue_adapter config to :sidekiq.

Rails.application.config.active_job.queue_adapter = :sidekiq

Defined Under Namespace

Classes: JobWrapper

Instance Attribute Summary

Attributes inherited from AbstractAdapter

#stopping

Instance Method Summary collapse

Methods inherited from AbstractAdapter

#stopping?

Constructor Details

#initializeSidekiqAdapter

:nodoc:



21
22
23
24
25
26
27
28
29
30
31
# File 'activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb', line 21

def initialize(*) # :nodoc:
  @stopping = false

  Sidekiq.configure_server do |config|
    config.on(:quiet) { @stopping = true }
  end

  Sidekiq.configure_client do |config|
    config.on(:quiet) { @stopping = true }
  end
end

Instance Method Details

#check_adapterObject



33
34
35
36
37
38
# File 'activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb', line 33

def check_adapter
  ActiveJob.deprecator.warn <<~MSG.squish
    The built-in `sidekiq` adapter is deprecated and will be removed in Rails 8.2.
    Please upgrade `sidekiq` gem to version 7.3.3 or later to use the `sidekiq` gem's adapter.
  MSG
end

#enqueue(job) ⇒ Object

:nodoc:



40
41
42
43
44
45
# File 'activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb', line 40

def enqueue(job) # :nodoc:
  job.provider_job_id = JobWrapper.set(
    wrapped: job.class,
    queue: job.queue_name
  ).perform_async(job.serialize)
end

#enqueue_all(jobs) ⇒ Object

:nodoc:



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb', line 54

def enqueue_all(jobs) # :nodoc:
  enqueued_count = 0
  jobs.group_by(&:class).each do |job_class, same_class_jobs|
    same_class_jobs.group_by(&:queue_name).each do |queue, same_class_and_queue_jobs|
      immediate_jobs, scheduled_jobs = same_class_and_queue_jobs.partition { |job| job.scheduled_at.nil? }

      if immediate_jobs.any?
        jids = Sidekiq::Client.push_bulk(
          "class" => JobWrapper,
          "wrapped" => job_class,
          "queue" => queue,
          "args" => immediate_jobs.map { |job| [job.serialize] },
        )
        enqueued_count += jids.compact.size
      end

      if scheduled_jobs.any?
        jids = Sidekiq::Client.push_bulk(
          "class" => JobWrapper,
          "wrapped" => job_class,
          "queue" => queue,
          "args" => scheduled_jobs.map { |job| [job.serialize] },
          "at" => scheduled_jobs.map { |job| job.scheduled_at&.to_f }
        )
        enqueued_count += jids.compact.size
      end
    end
  end
  enqueued_count
end

#enqueue_at(job, timestamp) ⇒ Object

:nodoc:



47
48
49
50
51
52
# File 'activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb', line 47

def enqueue_at(job, timestamp) # :nodoc:
  job.provider_job_id = JobWrapper.set(
    wrapped: job.class,
    queue: job.queue_name,
  ).perform_at(timestamp, job.serialize)
end