Class: GoodJob::Job
- Inherits:
-
BaseRecord
- Object
- ActiveRecord::Base
- BaseRecord
- GoodJob::Job
- Includes:
- AdvisoryLockable, ErrorEvents, Filterable, Reportable
- Defined in:
- app/models/good_job/job.rb
Overview
Active Record model that represents an ActiveJob
job.
Constant Summary collapse
- PreviouslyPerformedError =
Raised if something attempts to execute a previously completed Execution again.
Class.new(StandardError)
- ERROR_MESSAGE_SEPARATOR =
String separating Error Class from Error Message
": "
- DEFAULT_QUEUE_NAME =
ActiveJob jobs without a
queue_name
attribute are placed on this queue. 'default'
- DEFAULT_PRIORITY =
ActiveJob jobs without a
priority
attribute are given this priority. 0
- ActionForStateMismatchError =
Raised when an inappropriate action is applied to a Job based on its state.
Class.new(StandardError)
- AdapterNotGoodJobError =
Raised when GoodJob is not configured as the Active Job Queue Adapter
Class.new(StandardError)
- DiscardJobError =
Attached to a Job’s Execution when the Job is discarded.
Class.new(StandardError)
- ActiveJobDeserializationError =
Raised when Active Job data cannot be deserialized
Class.new(StandardError)
Constants included from AdvisoryLockable
AdvisoryLockable::RecordAlreadyAdvisoryLockedError
Class Method Summary collapse
-
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID.
- .build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
- .coalesce_scheduled_at_created_at ⇒ Object
-
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out.
-
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing.
-
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
-
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
-
.finished_before(timestamp) ⇒ ActiveRecord::Relation
Get Jobs finished before the given timestamp.
- .format_error(error) ⇒ Object
-
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name.
- .json_string(json, attr) ⇒ Object
-
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
-
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
- .params_execution_count ⇒ Object
- .params_job_class ⇒ Object
-
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
-
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
-
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param.
-
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
-
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
-
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
-
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
Instance Method Summary collapse
-
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
-
#destroy_job ⇒ void
Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.
-
#discard_job(message) ⇒ void
Discard a job so that it will not be executed further.
-
#discarded? ⇒ Boolean
Tests whether the job has finished but with an error.
-
#display_error ⇒ String
Errors for the job to be displayed in the Dashboard.
-
#display_name ⇒ String
Used when displaying this job in the GoodJob dashboard.
-
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard.
-
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
- #executions_count ⇒ Object
-
#finished? ⇒ Boolean
Tests whether the job has finished (succeeded or discarded).
-
#force_discard_job(message) ⇒ Object
Force discard a job so that it will not be executed further.
- #job_state ⇒ Object
- #number ⇒ Object
-
#perform(lock_id:) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
-
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running.
-
#recent_error ⇒ String
The most recent error message.
-
#reschedule_job(scheduled_at = Time.current) ⇒ void
Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.
-
#retry_job ⇒ ActiveJob::Base
Retry a job that has errored and been discarded.
-
#running? ⇒ Boolean
Tests whether the job is being executed right now.
-
#runtime_latency ⇒ Object
Time between when this job started and finished.
-
#succeeded? ⇒ Boolean
Tests whether the job has finished without error.
Methods included from Reportable
Methods included from AdvisoryLockable
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Methods inherited from BaseRecord
bind_value, migrated?, migration_pending_warning!, with_logger_silenced
Class Method Details
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID
84 |
# File 'app/models/good_job/job.rb', line 84 scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) } |
.build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
231 232 233 |
# File 'app/models/good_job/job.rb', line 231 def self.build_for_enqueue(active_job, scheduled_at: nil) new(**enqueue_args(active_job, scheduled_at: scheduled_at)) end |
.coalesce_scheduled_at_created_at ⇒ Object
226 227 228 |
# File 'app/models/good_job/job.rb', line 226 def coalesce_scheduled_at_created_at arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at']) end |
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out
109 |
# File 'app/models/good_job/job.rb', line 109 scope :creation_ordered, -> { order(created_at: :asc) } |
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing
118 119 120 121 122 123 124 |
# File 'app/models/good_job/job.rb', line 118 scope :dequeueing_ordered, (lambda do |parsed_queues| relation = self relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include] relation = relation.priority_ordered.creation_ordered relation end) |
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 |
# File 'app/models/good_job/job.rb', line 340 def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| current_job = CurrentThread.job retried = current_job && current_job.active_job_id == active_job.job_id if retried job = current_job job.assign_attributes(enqueue_args(active_job, scheduled_at: scheduled_at)) job.scheduled_at ||= Time.current # TODO: these values ideally shouldn't be persisted until the current_job is finished # which will require handling `retry_job` being called from outside the job context. job.performed_at = nil job.finished_at = nil else job = build_for_enqueue(active_job, scheduled_at: scheduled_at) end if create_with_advisory_lock if job.persisted? job.advisory_lock else job.create_with_advisory_lock = true end end instrument_payload[:job] = job job.save! CurrentThread.retried_job = job if retried active_job.provider_job_id = job.id raise "These should be equal" if active_job.provider_job_id != active_job.job_id job end end |
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'app/models/good_job/job.rb', line 236 def self.enqueue_args(active_job, scheduled_at: nil) execution_args = { id: active_job.job_id, active_job_id: active_job.job_id, job_class: active_job.class.name, queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, created_at: Time.current, } execution_args[:scheduled_at] = if scheduled_at scheduled_at elsif active_job.scheduled_at Time.zone.at(active_job.scheduled_at) else execution_args[:created_at] end execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? labels = active_job.good_job_labels.dup labels.map! { |label| label.to_s.strip.presence } labels.tap(&:compact!).tap(&:uniq!) execution_args[:labels] = labels end reenqueued_current_job = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id current_job = CurrentThread.job if reenqueued_current_job execution_args[:batch_id] = current_job.batch_id execution_args[:batch_callback_id] = current_job.batch_callback_id execution_args[:cron_key] = current_job.cron_key else execution_args[:batch_id] = GoodJob::Batch.current_batch_id execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id execution_args[:cron_key] = CurrentThread.cron_key execution_args[:cron_at] = CurrentThread.cron_at end execution_args end |
.finished_before(timestamp) ⇒ ActiveRecord::Relation
Get Jobs finished before the given timestamp.
54 |
# File 'app/models/good_job/job.rb', line 54 scope :finished_before, ->() { where(arel_table['finished_at'].lteq(bind_value('finished_at', , ActiveRecord::Type::DateTime))) } |
.format_error(error) ⇒ Object
377 378 379 380 381 |
# File 'app/models/good_job/job.rb', line 377 def self.format_error(error) raise ArgumentError unless error.is_a?(Exception) [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.].join end |
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name
76 |
# File 'app/models/good_job/job.rb', line 76 scope :job_class, ->(name) { where(params_job_class.eq(name)) } |
.json_string(json, attr) ⇒ Object
210 211 212 |
# File 'app/models/good_job/job.rb', line 210 def json_string(json, attr) Arel::Nodes::Grouping.new(Arel::Nodes::InfixOperation.new('->>', json, Arel::Nodes.build_quoted(attr))) end |
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 |
# File 'app/models/good_job/job.rb', line 315 def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) query = advisory_unlocked.unfinished.schedule_ordered after ||= Time.current after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime) after_query = query.where(arel_table['scheduled_at'].gt(after_bind)).or query.where(scheduled_at: nil).where(arel_table['created_at'].gt(after_bind)) after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { || .compact.first } if now_limit&.positive? now_query = query.where(arel_table['scheduled_at'].lt(bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime))).or query.where(scheduled_at: nil) now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { || .compact.first } end Array(now_at) + after_at end |
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
97 |
# File 'app/models/good_job/job.rb', line 97 scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))).or(where(scheduled_at: nil)) } |
.params_execution_count ⇒ Object
218 219 220 221 222 223 224 |
# File 'app/models/good_job/job.rb', line 218 def params_execution_count Arel::Nodes::InfixOperation.new( '::', json_string(arel_table['serialized_params'], 'executions'), Arel.sql('integer') ) end |
.params_job_class ⇒ Object
214 215 216 |
# File 'app/models/good_job/job.rb', line 214 def params_job_class json_string(arel_table['serialized_params'], 'job_class') end |
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'app/models/good_job/job.rb', line 289 def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) job = nil result = nil unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs| job = jobs.first if job&.executable? yield(job) if block_given? result = job.perform(lock_id: lock_id) else job = nil yield(nil) if block_given? end end job&.run_callbacks(:perform_unlocked) result end |
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
103 |
# File 'app/models/good_job/job.rb', line 103 scope :priority_ordered, -> { order('priority ASC NULLS LAST') } |
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param
131 132 133 134 135 136 |
# File 'app/models/good_job/job.rb', line 131 scope :queue_ordered, (lambda do |queues| clauses = queues.map.with_index do |queue_name, index| sanitize_sql_array(["WHEN queue_name = ? THEN ?", queue_name, index]) end order(Arel.sql("(CASE #{clauses.join(' ')} ELSE #{queues.size} END)")) end) |
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'app/models/good_job/job.rb', line 182 def queue_parser(string) string = string.strip.presence || '*' case string.first when '-' exclude_queues = true string = string[1..] when '+' ordered_queues = true string = string[1..] end queues = string.split(',').map(&:strip) if queues.include?('*') { all: true } elsif exclude_queues { exclude: queues } elsif ordered_queues { include: queues, ordered_queues: true, } else { include: queues } end end |
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
154 155 156 157 158 159 160 161 162 163 164 |
# File 'app/models/good_job/job.rb', line 154 scope :queue_string, (lambda do |string| parsed = queue_parser(string) if parsed[:all] all elsif parsed[:exclude] where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) elsif parsed[:include] where(queue_name: parsed[:include]) end end) |
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
142 |
# File 'app/models/good_job/job.rb', line 142 scope :schedule_ordered, -> { order(coalesce_scheduled_at_created_at.asc) } |
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
90 |
# File 'app/models/good_job/job.rb', line 90 scope :unfinished, -> { where(finished_at: nil) } |
Instance Method Details
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
535 536 537 538 539 540 541 542 543 |
# File 'app/models/good_job/job.rb', line 535 def active_job(ignore_deserialization_errors: false) ActiveJob::Base.deserialize(active_job_data).tap do |aj| aj.send(:deserialize_arguments_if_needed) rescue ActiveJob::DeserializationError raise unless ignore_deserialization_errors end rescue NameError raise unless ignore_deserialization_errors end |
#destroy_job ⇒ void
This method returns an undefined value.
Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.
522 523 524 525 526 527 528 |
# File 'app/models/good_job/job.rb', line 522 def destroy_job with_advisory_lock do raise ActionForStateMismatchError if finished_at.blank? destroy end end |
#discard_job(message) ⇒ void
This method returns an undefined value.
Discard a job so that it will not be executed further. This action will add a DiscardJobError to the job’s Execution and mark it as finished.
495 496 497 498 499 |
# File 'app/models/good_job/job.rb', line 495 def discard_job() with_advisory_lock do _discard_job() end end |
#discarded? ⇒ Boolean
Tests whether the job has finished but with an error.
443 444 445 |
# File 'app/models/good_job/job.rb', line 443 def discarded? finished? && error.present? end |
#display_error ⇒ String
Errors for the job to be displayed in the Dashboard.
398 399 400 401 402 403 404 |
# File 'app/models/good_job/job.rb', line 398 def display_error return error if error.present? serialized_params.fetch('exception_executions', {}).map do |exception, count| "#{exception}: #{count}" end.join(', ') end |
#display_name ⇒ String
Used when displaying this job in the GoodJob dashboard.
416 417 418 |
# File 'app/models/good_job/job.rb', line 416 def display_name job_class end |
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard
408 409 410 411 412 |
# File 'app/models/good_job/job.rb', line 408 def display_serialized_params serialized_params.merge({ _good_job: attributes.except('serialized_params', 'locktype', 'owns_advisory_lock'), }) end |
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
687 688 689 690 691 |
# File 'app/models/good_job/job.rb', line 687 def executable? reload.finished_at.blank? rescue ActiveRecord::RecordNotFound false end |
#executions_count ⇒ Object
420 421 422 |
# File 'app/models/good_job/job.rb', line 420 def executions_count super || 0 end |
#finished? ⇒ Boolean
Tests whether the job has finished (succeeded or discarded).
437 438 439 |
# File 'app/models/good_job/job.rb', line 437 def finished? finished_at.present? end |
#force_discard_job(message) ⇒ Object
Force discard a job so that it will not be executed further. Force discard allows discarding a running job. This action will add a DiscardJobError to the job’s Execution and mark it as finished.
504 505 506 |
# File 'app/models/good_job/job.rb', line 504 def force_discard_job() _discard_job() end |
#job_state ⇒ Object
711 712 713 714 715 |
# File 'app/models/good_job/job.rb', line 711 def job_state state = { queue_name: queue_name } state[:scheduled_at] = scheduled_at if scheduled_at state end |
#number ⇒ Object
693 694 695 |
# File 'app/models/good_job/job.rb', line 693 def number serialized_params.fetch('executions', 0) + 1 end |
#perform(lock_id:) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 |
# File 'app/models/good_job/job.rb', line 550 def perform(lock_id:) run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at job_performed_at = Time.current monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.job = self existing_performed_at = performed_at if existing_performed_at current_thread.execution_interrupted = existing_performed_at interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'")) self.error = interrupt_error_string self.error_event = :interrupted monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds execution_attrs = { error: interrupt_error_string, finished_at: job_performed_at, error_event: :interrupted, duration: monotonic_duration, } executions.where(finished_at: nil).where.not(performed_at: nil).update_all(execution_attrs) # rubocop:disable Rails/SkipsModelValidations end transaction do execution_attrs = { job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: scheduled_at || created_at, created_at: job_performed_at, process_id: lock_id, } job_attrs = { performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1), locked_by_id: lock_id, locked_at: Time.current, } execution = executions.create!(execution_attrs) update!(job_attrs) end ActiveSupport::Notifications.instrument("perform_job.good_job", { job: self, execution: execution, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) if value.is_a?(Exception) handled_error = value value = nil end handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard error_event = if handled_error == current_thread.error_on_discard :discarded elsif handled_error == current_thread.error_on_retry :retried elsif handled_error == current_thread.error_on_retry_stopped :retry_stopped elsif handled_error :handled end instrument_payload.merge!( value: value, handled_error: handled_error, retried: current_thread.retried_job.present?, error_event: error_event ) ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job) rescue StandardError => e error_event = if e.is_a?(GoodJob::InterruptError) :interrupted elsif e == current_thread.error_on_retry_stopped :retry_stopped else :unhandled end instrument_payload[:unhandled_error] = e ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) end end job_attributes = { locked_by_id: nil, locked_at: nil } job_error = result.handled_error || result.unhandled_error if job_error error_string = self.class.format_error(job_error) job_attributes[:error] = error_string job_attributes[:error_event] = result.error_event execution.error = error_string execution.error_event = result.error_event execution.error_backtrace = job_error.backtrace else job_attributes[:error] = nil job_attributes[:error_event] = nil end job_finished_at = Time.current monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds job_attributes[:finished_at] = job_finished_at execution.finished_at = job_finished_at execution.duration = monotonic_duration retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error reenqueued = result.retried? || retry_unhandled_error if reenqueued job_attributes[:performed_at] = nil job_attributes[:finished_at] = nil end assign_attributes(job_attributes) preserve_unhandled = result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error) if finished_at.blank? || GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present? transaction do execution.save! save! end else destroy! end result end end |
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running
698 699 700 701 702 703 704 |
# File 'app/models/good_job/job.rb', line 698 def queue_latency now = Time.zone.now expected_start = scheduled_at || created_at actual_start = performed_at || finished_at || now actual_start - expected_start unless expected_start >= now end |
#recent_error ⇒ String
The most recent error message. If the job has been retried, the error will be fetched from the previous Execution record.
392 393 394 |
# File 'app/models/good_job/job.rb', line 392 def recent_error error || executions[-2]&.error end |
#reschedule_job(scheduled_at = Time.current) ⇒ void
This method returns an undefined value.
Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.
511 512 513 514 515 516 517 518 |
# File 'app/models/good_job/job.rb', line 511 def reschedule_job(scheduled_at = Time.current) with_advisory_lock do reload raise ActionForStateMismatchError if finished_at.present? update(scheduled_at: scheduled_at) end end |
#retry_job ⇒ ActiveJob::Base
Retry a job that has errored and been discarded. This action will create a new Execution record for the job.
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 |
# File 'app/models/good_job/job.rb', line 456 def retry_job with_advisory_lock do reload active_job = self.active_job(ignore_deserialization_errors: true) raise ActiveJobDeserializationError if active_job.nil? raise AdapterNotGoodJobError unless active_job.class.queue_adapter.is_a? GoodJob::Adapter raise ActionForStateMismatchError if finished_at.blank? || error.blank? # Update the executions count because the previous execution will not have been preserved # Do not update `exception_executions` because that comes from rescue_from's arguments active_job.executions = (active_job.executions || 0) + 1 begin error_class, = error.split(ERROR_MESSAGE_SEPARATOR).map(&:strip) error = error_class.constantize.new() rescue StandardError error = StandardError.new(error) end new_active_job = nil GoodJob::CurrentThread.within do |current_thread| current_thread.job = self current_thread.retry_now = true transaction do new_active_job = active_job.retry_job(wait: 0, error: error) self.error_event = :retried if error save! end end new_active_job end end |
#running? ⇒ Boolean
Tests whether the job is being executed right now.
426 427 428 429 430 431 432 433 |
# File 'app/models/good_job/job.rb', line 426 def running? # Avoid N+1 Query: `.includes_advisory_locks` if has_attribute?(:locktype) self['locktype'].present? else advisory_locked? end end |
#runtime_latency ⇒ Object
Time between when this job started and finished
707 708 709 |
# File 'app/models/good_job/job.rb', line 707 def runtime_latency (finished_at || Time.zone.now) - performed_at if performed_at end |
#succeeded? ⇒ Boolean
Tests whether the job has finished without error
449 450 451 |
# File 'app/models/good_job/job.rb', line 449 def succeeded? finished? && !discarded? end |