Class: GoodJob::Adapter

Inherits:
Object
  • Object
show all
Defined in:
lib/good_job/adapter.rb

Overview

ActiveJob Adapter.

Class Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(execution_mode: nil, _capsule: GoodJob.capsule) ⇒ Adapter

Returns a new instance of Adapter.

Parameters:

  • execution_mode (Symbol, nil) (defaults to: nil)

    specifies how and where jobs should be executed. You can also set this with the environment variable GOOD_JOB_EXECUTION_MODE.

    • :inline executes jobs immediately in whatever process queued them (usually the web server process). This should only be used in test and development environments.

    • :external causes the adapter to enqueue jobs, but not execute them. When using this option (the default for production environments), you’ll need to use the command-line tool to actually execute your jobs.

    • :async (or :async_server) executes jobs in separate threads within the Rails web server process (‘bundle exec rails server`). It can be more economical for small workloads because you don’t need a separate machine or environment for running your jobs, but if your web server is under heavy load or your jobs require a lot of resources, you should choose :external instead. When not in the Rails web server, jobs will execute in :external mode to ensure jobs are not executed within ‘rails console`, `rails db:migrate`, `rails assets:prepare`, etc.

    • :async_all executes jobs in any Rails process.

    The default value depends on the Rails environment:

    • development: :async:

    -+test+: +:inline+
    
    • production and all other environments: :external



28
29
30
31
32
33
34
35
# File 'lib/good_job/adapter.rb', line 28

def initialize(execution_mode: nil, _capsule: GoodJob.capsule) # rubocop:disable Lint/UnderscorePrefixedVariableName
  @_execution_mode_override = execution_mode
  GoodJob::Configuration.validate_execution_mode(@_execution_mode_override) if @_execution_mode_override
  @capsule = _capsule

  start_async if GoodJob.async_ready?
  self.class.instances << self
end

Class Attribute Details

.instancesArray<GoodJob::Adapter>? (readonly)

List of all instantiated Adapters in the current process.

Returns:



12
# File 'lib/good_job/adapter.rb', line 12

cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

Instance Method Details

#async_started?Boolean

Whether the async executors are running

Returns:

  • (Boolean)


252
253
254
# File 'lib/good_job/adapter.rb', line 252

def async_started?
  @_async_started
end

#enqueue(active_job) ⇒ GoodJob::Execution

Enqueues the ActiveJob job to be performed. For use by Rails; you should generally not call this directly.

Parameters:

  • active_job (ActiveJob::Base)

    the job to be enqueued from #perform_later

Returns:



41
42
43
# File 'lib/good_job/adapter.rb', line 41

def enqueue(active_job)
  enqueue_at(active_job, nil)
end

#enqueue_after_transaction_commit?Boolean

Defines if enqueueing this job from inside an Active Record transaction automatically defers the enqueue to after the transaction commit.

Returns:

  • (Boolean)


47
48
49
# File 'lib/good_job/adapter.rb', line 47

def enqueue_after_transaction_commit?
  GoodJob.configuration.enqueue_after_transaction_commit
end

#enqueue_all(active_jobs) ⇒ Integer

Enqueues multiple ActiveJob instances at once

Parameters:

  • active_jobs (Array<ActiveJob::Base>)

    jobs to be enqueued

Returns:

  • (Integer)

    number of jobs that were successfully enqueued



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/good_job/adapter.rb', line 54

def enqueue_all(active_jobs)
  active_jobs = Array(active_jobs)
  return 0 if active_jobs.empty?

  Rails.application.executor.wrap do
    current_time = Time.current
    executions = active_jobs.map do |active_job|
      GoodJob::Execution.build_for_enqueue(active_job).tap do |execution|
        if GoodJob::Execution.discrete_support?
          execution.make_discrete
          execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at
        end

        execution.created_at = current_time
        execution.updated_at = current_time
      end
    end

    inline_executions = []
    GoodJob::Execution.transaction(requires_new: true, joinable: false) do
      execution_attributes = executions.map do |execution|
        if GoodJob::Execution.error_event_migrated?
          execution.attributes
        else
          execution.attributes.except('error_event')
        end
      end

      results = GoodJob::Execution.insert_all(execution_attributes, returning: %w[id active_job_id]) # rubocop:disable Rails/SkipsModelValidations

      job_id_to_provider_job_id = results.each_with_object({}) { |result, hash| hash[result['active_job_id']] = result['id'] }
      active_jobs.each do |active_job|
        active_job.provider_job_id = job_id_to_provider_job_id[active_job.job_id]
        active_job.successfully_enqueued = active_job.provider_job_id.present? if active_job.respond_to?(:successfully_enqueued=)
      end
      executions.each do |execution|
        execution.instance_variable_set(:@new_record, false) if job_id_to_provider_job_id[execution.active_job_id]
      end
      executions = executions.select(&:persisted?) # prune unpersisted executions

      if execute_inline?
        inline_executions = executions.select { |execution| (execution.scheduled_at.nil? || execution.scheduled_at <= Time.current) }
        inline_executions.each(&:advisory_lock!)
      end
    end

    @capsule.tracker.register
    begin
      until inline_executions.empty?
        begin
          inline_execution = inline_executions.shift
          inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock)

          retried_execution = inline_result.retried
          while retried_execution && retried_execution.scheduled_at <= Time.current
            inline_execution = retried_execution
            inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock)
            retried_execution = inline_result.retried
          end
        ensure
          inline_execution.advisory_unlock
          inline_execution.run_callbacks(:perform_unlocked)
        end
        raise inline_result.unhandled_error if inline_result.unhandled_error
      end
    ensure
      @capsule.tracker.unregister
      inline_executions.each(&:advisory_unlock)
    end

    non_inline_executions = executions.reject(&:finished_at)
    if non_inline_executions.any?
      job_id_to_active_jobs = active_jobs.index_by(&:job_id)
      non_inline_executions.group_by(&:queue_name).each do |queue_name, executions_by_queue|
        executions_by_queue.group_by(&:scheduled_at).each do |scheduled_at, executions_by_queue_and_scheduled_at|
          state = { queue_name: queue_name, count: executions_by_queue_and_scheduled_at.size }
          state[:scheduled_at] = scheduled_at if scheduled_at

          executed_locally = execute_async? && @capsule&.create_thread(state)
          unless executed_locally
            state[:count] = job_id_to_active_jobs.values_at(*executions_by_queue_and_scheduled_at.map(&:active_job_id)).count { |active_job| send_notify?(active_job) }
            Notifier.notify(state) unless state[:count].zero?
          end
        end
      end
    end
  end

  active_jobs.count(&:provider_job_id)
end

#enqueue_at(active_job, timestamp) ⇒ GoodJob::Execution

Enqueues an ActiveJob job to be run at a specific time. For use by Rails; you should generally not call this directly.

Parameters:

  • active_job (ActiveJob::Base)

    the job to be enqueued from #perform_later

  • timestamp (Integer, nil)

    the epoch time to perform the job

Returns:



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/good_job/adapter.rb', line 150

def enqueue_at(active_job, timestamp)
  scheduled_at = timestamp ? Time.zone.at(timestamp) : nil

  # If there is a currently open Bulk in the current thread, direct the
  # job there to be enqueued using enqueue_all
  return if GoodJob::Bulk.capture(active_job, queue_adapter: self)

  Rails.application.executor.wrap do
    will_execute_inline = execute_inline? && (scheduled_at.nil? || scheduled_at <= Time.current)
    will_retry_inline = will_execute_inline && CurrentThread.execution&.active_job_id == active_job.job_id && !CurrentThread.retry_now

    if will_retry_inline
      execution = GoodJob::Execution.enqueue(
        active_job,
        scheduled_at: scheduled_at
      )
    elsif will_execute_inline
      execution = GoodJob::Execution.enqueue(
        active_job,
        scheduled_at: scheduled_at,
        create_with_advisory_lock: true
      )
      begin
        result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) }

        retried_execution = result.retried
        while retried_execution && (retried_execution.scheduled_at.nil? || retried_execution.scheduled_at <= Time.current)
          execution = retried_execution
          result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) }
          retried_execution = result.retried
        end

        Notifier.notify(retried_execution.job_state) if retried_execution&.scheduled_at && retried_execution.scheduled_at > Time.current && send_notify?(active_job)
      ensure
        execution.advisory_unlock
        execution.run_callbacks(:perform_unlocked)
      end
      raise result.unhandled_error if result.unhandled_error
    else
      execution = GoodJob::Execution.enqueue(
        active_job,
        scheduled_at: scheduled_at
      )

      executed_locally = execute_async? && @capsule&.create_thread(execution.job_state)
      Notifier.notify(execution.job_state) if !executed_locally && send_notify?(active_job)
    end

    execution
  end
end

#execute_async?Boolean

Whether in :async execution mode.

Returns:

  • (Boolean)


222
223
224
225
# File 'lib/good_job/adapter.rb', line 222

def execute_async?
  execution_mode == :async_all ||
    (execution_mode.in?([:async, :async_server]) && GoodJob.configuration.in_webserver?)
end

#execute_externally?Boolean

Whether in :external execution mode.

Returns:

  • (Boolean)


229
230
231
232
233
# File 'lib/good_job/adapter.rb', line 229

def execute_externally?
  execution_mode.nil? ||
    execution_mode == :external ||
    (execution_mode.in?([:async, :async_server]) && !GoodJob.configuration.in_webserver?)
end

#execute_inline?Boolean

Whether in :inline execution mode.

Returns:

  • (Boolean)


237
238
239
# File 'lib/good_job/adapter.rb', line 237

def execute_inline?
  execution_mode == :inline
end

#execution_modeSymbol?

This adapter’s execution mode

Returns:

  • (Symbol, nil)


216
217
218
# File 'lib/good_job/adapter.rb', line 216

def execution_mode
  @_execution_mode_override || GoodJob.configuration.execution_mode
end

#shutdown(timeout: NONE) ⇒ void

This method returns an undefined value.

Shut down the thread pool executors.

Parameters:

  • timeout (nil, Numeric, NONE) (defaults to: NONE)

    Seconds to wait for active threads.

    • nil trigger a shutdown but not wait for it to complete.

    • -1 wait until the shutdown is complete.

    • 0 immediately shutdown and stop any threads.

    • A positive number will wait that many seconds before stopping any remaining active threads.



209
210
211
212
# File 'lib/good_job/adapter.rb', line 209

def shutdown(timeout: NONE)
  @capsule&.shutdown(timeout: timeout)
  @_async_started = false
end

#start_asyncvoid

This method returns an undefined value.

Start async executors



243
244
245
246
247
248
# File 'lib/good_job/adapter.rb', line 243

def start_async
  return unless execute_async?

  @capsule.start
  @_async_started = true
end