Class: GoodJob::Adapter

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/adapter.rb,
lib/good_job/adapter/inline_buffer.rb

Overview

ActiveJob Adapter.

Defined Under Namespace

Classes: InlineBuffer

Class Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(execution_mode: nil, _capsule: GoodJob.capsule) ⇒ Adapter

Returns a new instance of Adapter.

Parameters:

  • execution_mode (Symbol, nil) (defaults to: nil)

    specifies how and where jobs should be executed. You can also set this with the environment variable GOOD_JOB_EXECUTION_MODE.

    • :inline executes jobs immediately in whatever process queued them (usually the web server process). This should only be used in test and development environments.

    • :external causes the adapter to enqueue jobs, but not execute them. When using this option (the default for production environments), you’ll need to use the command-line tool to actually execute your jobs.

    • :async (or :async_server) executes jobs in separate threads within the Rails web server process (‘bundle exec rails server`). It can be more economical for small workloads because you don’t need a separate machine or environment for running your jobs, but if your web server is under heavy load or your jobs require a lot of resources, you should choose :external instead. When not in the Rails web server, jobs will execute in :external mode to ensure jobs are not executed within ‘rails console`, `rails db:migrate`, `rails assets:prepare`, etc.

    • :async_all executes jobs in any Rails process.

    The default value depends on the Rails environment:

    • development: :async:

    -+test+: +:inline+
    
    • production and all other environments: :external



28
29
30
31
32
33
34
35
# File 'lib/good_job/adapter.rb', line 28

def initialize(execution_mode: nil, _capsule: GoodJob.capsule) # rubocop:disable Lint/UnderscorePrefixedVariableName
  @_execution_mode_override = execution_mode
  GoodJob::Configuration.validate_execution_mode(@_execution_mode_override) if @_execution_mode_override
  @capsule = _capsule

  start_async if GoodJob.async_ready?
  self.class.instances << self
end

Class Attribute Details

.instancesArray<GoodJob::Adapter>? (readonly)

List of all instantiated Adapters in the current process.

Returns:



12
# File 'lib/good_job/adapter.rb', line 12

cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

Instance Method Details

#async_started?Boolean

Whether the async executors are running

Returns:

  • (Boolean)


225
226
227
# File 'lib/good_job/adapter.rb', line 225

def async_started?
  @_async_started
end

#enqueue(active_job) ⇒ GoodJob::Job

Enqueues the ActiveJob job to be performed. For use by Rails; you should generally not call this directly.

Parameters:

  • active_job (ActiveJob::Base)

    the job to be enqueued from #perform_later

Returns:



41
42
43
# File 'lib/good_job/adapter.rb', line 41

def enqueue(active_job)
  enqueue_at(active_job, nil)
end

#enqueue_after_transaction_commit?Boolean

Defines if enqueueing this job from inside an Active Record transaction automatically defers the enqueue to after the transaction commit.

Returns:

  • (Boolean)


47
48
49
# File 'lib/good_job/adapter.rb', line 47

def enqueue_after_transaction_commit?
  GoodJob.configuration.enqueue_after_transaction_commit
end

#enqueue_all(active_jobs) ⇒ Integer

Enqueues multiple ActiveJob instances at once

Parameters:

  • active_jobs (Array<ActiveJob::Base>)

    jobs to be enqueued

Returns:

  • (Integer)

    number of jobs that were successfully enqueued



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/good_job/adapter.rb', line 54

def enqueue_all(active_jobs)
  active_jobs = Array(active_jobs)
  return 0 if active_jobs.empty?

  Rails.application.executor.wrap do
    current_time = Time.current
    jobs = active_jobs.map do |active_job|
      GoodJob::Job.build_for_enqueue(active_job).tap do |job|
        job.scheduled_at = current_time if job.scheduled_at == job.created_at
        job.created_at = current_time
        job.updated_at = current_time
      end
    end

    inline_jobs = []
    GoodJob::Job.transaction(requires_new: true, joinable: false) do
      job_attributes = jobs.map(&:attributes)
      results = GoodJob::Job.insert_all(job_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations

      job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
      active_jobs.each do |active_job|
        active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
        active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
      end
      jobs.each do |job|
        job.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[job.active_job_id]
      end
      jobs = jobs.select(&:persisted?) # prune unpersisted jobs

      if execute_inline?
        inline_jobs = jobs.select { |job| job.scheduled_at.nil? || job.scheduled_at <= Time.current }
        inline_jobs.each(&:advisory_lock!)
      end
    end

    if inline_jobs.any?
      deferred = InlineBuffer.defer?
      InlineBuffer.perform_now_or_defer do
        @capsule.tracker.register do
          until inline_jobs.empty?
            inline_job = inline_jobs.shift
            perform_inline(inline_job, notify: deferred ? send_notify?(inline_job) : false)
          end
        ensure
          inline_jobs.each(&:advisory_unlock)
        end
      end
    end

    non_inline_jobs = if InlineBuffer.defer?
                        jobs - inline_jobs
                      else
                        jobs.reject(&:finished_at)
                      end
    if non_inline_jobs.any?
      job_id_to_active_jobs = active_jobs.index_by(&:job_id)
      non_inline_jobs.group_by(&:queue_name).each do |queue_name, jobs_by_queue|
        jobs_by_queue.group_by(&:scheduled_at).each do |scheduled_at, jobs_by_queue_and_scheduled_at|
          state = { queue_name: queue_name, count: jobs_by_queue_and_scheduled_at.size }
          state[:scheduled_at] = scheduled_at if scheduled_at

          executed_locally = execute_async? && @capsule&.create_thread(state)
          unless executed_locally
            state[:count] = job_id_to_active_jobs.values_at(*jobs_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
            Notifier.notify(state) unless state[:count].zero?
          end
        end
      end
    end
  end

  active_jobs.count(&:provider_job_id)
end

#enqueue_at(active_job, timestamp) ⇒ GoodJob::Job

Enqueues an ActiveJob job to be run at a specific time. For use by Rails; you should generally not call this directly.

Parameters:

  • active_job (ActiveJob::Base)

    the job to be enqueued from #perform_later

  • timestamp (Integer, nil)

    the epoch time to perform the job

Returns:



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/good_job/adapter.rb', line 133

def enqueue_at(active_job, timestamp)
  scheduled_at = timestamp ? Time.zone.at(timestamp) : nil

  # If there is a currently open Bulk in the current thread, direct the
  # job there to be enqueued using enqueue_all
  return if GoodJob::Bulk.capture(active_job, queue_adapter: self)

  Rails.application.executor.wrap do
    will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
    will_retry_inline = will_execute_inline && CurrentThread.job&.active_job_id == active_job.job_id && !CurrentThread.retry_now

    if will_retry_inline
      job = GoodJob::Job.enqueue(
        active_job,
        scheduled_at: scheduled_at
      )
    elsif will_execute_inline
      job = GoodJob::Job.enqueue(
        active_job,
        scheduled_at: scheduled_at,
        create_with_advisory_lock: true
      )
      InlineBuffer.perform_now_or_defer do
        @capsule.tracker.register do
          perform_inline(job, notify: send_notify?(active_job))
        end
      end
    else
      job = GoodJob::Job.enqueue(
        active_job,
        scheduled_at: scheduled_at
      )

      executed_locally = execute_async? && @capsule&.create_thread(job.job_state)
      Notifier.notify(job.job_state) if !executed_locally && send_notify?(active_job)
    end

    job
  end
end

#execute_async?Boolean

Whether in :async execution mode.

Returns:

  • (Boolean)


194
195
196
197
# File 'lib/good_job/adapter.rb', line 194

def execute_async?
  execution_mode == :async_all ||
    (execution_mode.in?([:async, :async_server]) && GoodJob.configuration.in_webserver?)
end

#execute_externally?Boolean

Whether in :external execution mode.

Returns:

  • (Boolean)


201
202
203
204
205
# File 'lib/good_job/adapter.rb', line 201

def execute_externally?
  execution_mode.nil? ||
    execution_mode == :external ||
    (execution_mode.in?([:async, :async_server]) && !GoodJob.configuration.in_webserver?)
end

#execute_inline?Boolean

Whether in :inline execution mode.

Returns:

  • (Boolean)


209
210
211
# File 'lib/good_job/adapter.rb', line 209

def execute_inline?
  execution_mode == :inline
end

#execution_modeSymbol?

This adapter’s execution mode

Returns:

  • (Symbol, nil)


188
189
190
# File 'lib/good_job/adapter.rb', line 188

def execution_mode
  @_execution_mode_override || GoodJob.configuration.execution_mode
end

#shutdown(timeout: NONE) ⇒ void

This method returns an undefined value.

Shut down the thread pool executors.

Parameters:

  • timeout (nil, Numeric, NONE) (defaults to: NONE)

    Seconds to wait for active threads.

    • nil trigger a shutdown but not wait for it to complete.

    • -1 wait until the shutdown is complete.

    • 0 immediately shutdown and stop any threads.

    • A positive number will wait that many seconds before stopping any remaining active threads.



181
182
183
184
# File 'lib/good_job/adapter.rb', line 181

def shutdown(timeout: NONE)
  @capsule&.shutdown(timeout: timeout)
  @_async_started = false
end

#start_asyncvoid

This method returns an undefined value.

Start async executors



215
216
217
218
219
220
221
# File 'lib/good_job/adapter.rb', line 215

def start_async
  return unless execute_async?

  @capsule.start
  @capsule.lower_thread_priority = true if GoodJob.configuration.lower_thread_priority.in?([true, nil])
  @_async_started = true
end