Class: GoodJob::Job
- Inherits:
-
BaseRecord
- Object
- ActiveRecord::Base
- BaseRecord
- GoodJob::Job
- Includes:
- AdvisoryLockable, ErrorEvents, Filterable, Reportable
- Defined in:
- app/models/good_job/job.rb
Overview
Active Record model that represents an ActiveJob
job.
Constant Summary collapse
- PreviouslyPerformedError =
Raised if something attempts to execute a previously completed Execution again.
Class.new(StandardError)
- ERROR_MESSAGE_SEPARATOR =
String separating Error Class from Error Message
": "
- DEFAULT_QUEUE_NAME =
ActiveJob jobs without a
queue_name
attribute are placed on this queue. 'default'
- DEFAULT_PRIORITY =
ActiveJob jobs without a
priority
attribute are given this priority. 0
- ActionForStateMismatchError =
Raised when an inappropriate action is applied to a Job based on its state.
Class.new(StandardError)
- AdapterNotGoodJobError =
Raised when GoodJob is not configured as the Active Job Queue Adapter
Class.new(StandardError)
- DiscardJobError =
Attached to a Job’s Execution when the Job is discarded.
Class.new(StandardError)
- ActiveJobDeserializationError =
Raised when Active Job data cannot be deserialized
Class.new(StandardError)
Constants included from AdvisoryLockable
AdvisoryLockable::RecordAlreadyAdvisoryLockedError
Class Method Summary collapse
-
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID.
- .build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
- .concurrency_key_created_at_index_migrated? ⇒ Boolean
-
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out.
-
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing.
-
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
-
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
-
.exclude_paused ⇒ ActiveRecord::Relation
Exclude jobs that are paused via queue_name or job_class.
-
.finished_before(timestamp) ⇒ ActiveRecord::Relation
Get Jobs finished before the given timestamp.
- .format_error(error) ⇒ Object
-
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name.
- .json_string(json, attr) ⇒ Object
-
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
-
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
- .params_execution_count ⇒ Object
- .params_job_class ⇒ Object
-
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
-
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
-
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param.
-
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
-
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
-
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
-
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
Instance Method Summary collapse
-
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
-
#destroy_job ⇒ void
Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.
-
#discard_job(message) ⇒ void
Discard a job so that it will not be executed further.
-
#discarded? ⇒ Boolean
Tests whether the job has finished but with an error.
-
#display_error ⇒ String
Errors for the job to be displayed in the Dashboard.
-
#display_name ⇒ String
Used when displaying this job in the GoodJob dashboard.
-
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard.
-
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
- #executions_count ⇒ Object
-
#finished? ⇒ Boolean
Tests whether the job has finished (succeeded or discarded).
-
#force_discard_job(message) ⇒ Object
Force discard a job so that it will not be executed further.
- #job_state ⇒ Object
- #number ⇒ Object
-
#perform(lock_id:) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
-
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running.
-
#recent_error ⇒ String
The most recent error message.
-
#reschedule_job(scheduled_at = Time.current) ⇒ void
Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.
-
#retry_job ⇒ ActiveJob::Base
Retry a job that has errored and been discarded.
-
#running? ⇒ Boolean
Tests whether the job is being executed right now.
-
#runtime_latency ⇒ Object
Time between when this job started and finished.
-
#succeeded? ⇒ Boolean
Tests whether the job has finished without error.
Methods included from Reportable
Methods included from AdvisoryLockable
#advisory_lock, #advisory_lock!, #advisory_locked?, #advisory_unlock, #advisory_unlock!, #advisory_unlocked?, #lockable_column_key, #lockable_key, #owns_advisory_lock?, #with_advisory_lock
Methods inherited from BaseRecord
bind_value, migrated?, migration_pending_warning!, with_logger_silenced
Class Method Details
.active_job_id(active_job_id) ⇒ ActiveRecord::Relation
Get jobs with given ActiveJob ID
84 |
# File 'app/models/good_job/job.rb', line 84 scope :active_job_id, ->(active_job_id) { where(active_job_id: active_job_id) } |
.build_for_enqueue(active_job, scheduled_at: nil) ⇒ Object
261 262 263 |
# File 'app/models/good_job/job.rb', line 261 def self.build_for_enqueue(active_job, scheduled_at: nil) new(**enqueue_args(active_job, scheduled_at: scheduled_at)) end |
.concurrency_key_created_at_index_migrated? ⇒ Boolean
253 254 255 256 257 258 |
# File 'app/models/good_job/job.rb', line 253 def concurrency_key_created_at_index_migrated? return true if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_concurrency_key_and_created_at) migration_pending_warning! false end |
.creation_ordered ⇒ ActiveRecord:Relation
Order jobs by created_at, for first-in first-out
136 |
# File 'app/models/good_job/job.rb', line 136 scope :creation_ordered, -> { order(created_at: :asc) } |
.dequeueing_ordered(parsed_queues) ⇒ ActiveRecord::Relation
Order jobs for de-queueing
145 146 147 148 149 150 151 |
# File 'app/models/good_job/job.rb', line 145 scope :dequeueing_ordered, (lambda do |parsed_queues| relation = self relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include] relation = relation.priority_ordered.creation_ordered relation end) |
.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ⇒ Execution
Places an ActiveJob job on a queue by creating a new Execution record.
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'app/models/good_job/job.rb', line 371 def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| current_job = CurrentThread.job retried = current_job && current_job.active_job_id == active_job.job_id if retried job = current_job job.assign_attributes(enqueue_args(active_job, scheduled_at: scheduled_at)) job.scheduled_at ||= Time.current # TODO: these values ideally shouldn't be persisted until the current_job is finished # which will require handling `retry_job` being called from outside the job context. job.performed_at = nil job.finished_at = nil else job = build_for_enqueue(active_job, scheduled_at: scheduled_at) end if create_with_advisory_lock if job.persisted? job.advisory_lock else job.create_with_advisory_lock = true end end instrument_payload[:job] = job begin job.save! rescue ActiveRecord::RecordNotUnique raise unless job.cron_key # Active Job doesn't have a clean way to cancel an enqueue for unexceptional reasons # This is a workaround to mark it as having been halted in before_enqueue active_job.send(:halted_callback_hook, "duplicate_cron_key", "good_job") return false end CurrentThread.retried_job = job if retried active_job.provider_job_id = job.id raise "These should be equal" if active_job.provider_job_id != active_job.job_id job end end |
.enqueue_args(active_job, scheduled_at: nil) ⇒ Object
Construct arguments for GoodJob::Execution from an ActiveJob instance.
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'app/models/good_job/job.rb', line 266 def self.enqueue_args(active_job, scheduled_at: nil) execution_args = { id: active_job.job_id, active_job_id: active_job.job_id, job_class: active_job.class.name, queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, priority: active_job.priority || DEFAULT_PRIORITY, serialized_params: active_job.serialize, created_at: Time.current, } execution_args[:scheduled_at] = if scheduled_at scheduled_at elsif active_job.scheduled_at Time.zone.at(active_job.scheduled_at) else execution_args[:created_at] end execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) if active_job.respond_to?(:good_job_labels) && active_job.good_job_labels.any? labels = active_job.good_job_labels.dup labels.map! { |label| label.to_s.strip.presence } labels.tap(&:compact!).tap(&:uniq!) execution_args[:labels] = labels end reenqueued_current_job = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id current_job = CurrentThread.job if reenqueued_current_job execution_args[:batch_id] = current_job.batch_id execution_args[:batch_callback_id] = current_job.batch_callback_id execution_args[:cron_key] = current_job.cron_key else execution_args[:batch_id] = GoodJob::Batch.current_batch_id execution_args[:batch_callback_id] = GoodJob::Batch.current_batch_callback_id execution_args[:cron_key] = CurrentThread.cron_key execution_args[:cron_at] = CurrentThread.cron_at end execution_args end |
.exclude_paused ⇒ ActiveRecord::Relation
Exclude jobs that are paused via queue_name or job_class. Only applies when enable_pauses configuration is true.
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'app/models/good_job/job.rb', line 104 scope :exclude_paused, lambda { return all unless GoodJob.configuration.enable_pauses paused_query = GoodJob::Setting.where(key: GoodJob::Setting::PAUSES) paused_queues_query = paused_query.select("jsonb_array_elements_text(value->'queues')") paused_job_classes_query = paused_query.select("jsonb_array_elements_text(value->'job_classes')") paused_labels_query = paused_query.select("jsonb_array_elements_text(value->'labels')") where.not(queue_name: paused_queues_query) .where.not(job_class: paused_job_classes_query) .where( Arel::Nodes::Not.new( Arel::Nodes::NamedFunction.new( "COALESCE", [ Arel::Nodes::InfixOperation.new('&&', arel_table['labels'], Arel::Nodes::NamedFunction.new('ARRAY', [paused_labels_query.arel])), Arel::Nodes::SqlLiteral.new('FALSE'), ] ) ) ) } |
.finished_before(timestamp) ⇒ ActiveRecord::Relation
Get Jobs finished before the given timestamp.
54 |
# File 'app/models/good_job/job.rb', line 54 scope :finished_before, ->() { where(arel_table['finished_at'].lteq(bind_value('finished_at', , ActiveRecord::Type::DateTime))) } |
.format_error(error) ⇒ Object
417 418 419 420 421 |
# File 'app/models/good_job/job.rb', line 417 def self.format_error(error) raise ArgumentError unless error.is_a?(Exception) [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.].join end |
.job_class(name) ⇒ ActiveRecord::Relation
With a given class name
76 |
# File 'app/models/good_job/job.rb', line 76 scope :job_class, ->(name) { where(params_job_class.eq(name)) } |
.json_string(json, attr) ⇒ Object
237 238 239 |
# File 'app/models/good_job/job.rb', line 237 def json_string(json, attr) Arel::Nodes::Grouping.new(Arel::Nodes::InfixOperation.new('->>', json, Arel::Nodes.build_quoted(attr))) end |
.next_scheduled_at(after: nil, limit: 100, now_limit: nil) ⇒ Array<DateTime>
Fetches the scheduled execution time of the next eligible Execution(s).
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
# File 'app/models/good_job/job.rb', line 345 def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) query = advisory_unlocked.unfinished.schedule_ordered after ||= Time.current after_bind = bind_value('scheduled_at', after, ActiveRecord::Type::DateTime) after_query = query.where(arel_table['scheduled_at'].gt(after_bind)) after_at = after_query.limit(limit).pluck(:scheduled_at) if now_limit&.positive? now_bind = bind_value('scheduled_at', Time.current, ActiveRecord::Type::DateTime) now_query = query.where(arel_table['scheduled_at'].lt(now_bind)) now_at = now_query.limit(now_limit).pluck(:scheduled_at) end Array(now_at) + after_at end |
.only_scheduled ⇒ ActiveRecord::Relation
Get jobs that are not scheduled for a later time than now (i.e. jobs that are not scheduled or scheduled for earlier than the current time).
97 |
# File 'app/models/good_job/job.rb', line 97 scope :only_scheduled, -> { where(arel_table['scheduled_at'].lteq(bind_value('scheduled_at', DateTime.current, ActiveRecord::Type::DateTime))) } |
.params_execution_count ⇒ Object
245 246 247 248 249 250 251 |
# File 'app/models/good_job/job.rb', line 245 def params_execution_count Arel::Nodes::InfixOperation.new( '::', json_string(arel_table['serialized_params'], 'executions'), Arel.sql('integer') ) end |
.params_job_class ⇒ Object
241 242 243 |
# File 'app/models/good_job/job.rb', line 241 def params_job_class json_string(arel_table['serialized_params'], 'job_class') end |
.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) {|Execution, nil| ... } ⇒ ExecutionResult?
Finds the next eligible Execution, acquire an advisory lock related to it, and executes the job.
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 |
# File 'app/models/good_job/job.rb', line 319 def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) job = nil result = nil unfinished.dequeueing_ordered(parsed_queues).only_scheduled.exclude_paused.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |jobs| job = jobs.first if job&.executable? yield(job) if block_given? result = job.perform(lock_id: lock_id) else job = nil yield(nil) if block_given? end end job&.run_callbacks(:perform_unlocked) result end |
.priority_ordered ⇒ ActiveRecord::Relation
Order jobs by priority (highest priority first).
130 |
# File 'app/models/good_job/job.rb', line 130 scope :priority_ordered, -> { order('priority ASC NULLS LAST') } |
.queue_ordered(queues) ⇒ ActiveRecord::Relation
Order jobs in order of queues in array param
158 159 160 161 162 163 |
# File 'app/models/good_job/job.rb', line 158 scope :queue_ordered, (lambda do |queues| clauses = queues.map.with_index do |queue_name, index| sanitize_sql_array(["WHEN queue_name = ? THEN ?", queue_name, index]) end order(Arel.sql("(CASE #{clauses.join(' ')} ELSE #{queues.size} END)")) end) |
.queue_parser(string) ⇒ Hash
Parse a string representing a group of queues into a more readable data structure.
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'app/models/good_job/job.rb', line 209 def queue_parser(string) string = string.strip.presence || '*' case string.first when '-' exclude_queues = true string = string[1..] when '+' ordered_queues = true string = string[1..] end queues = string.split(',').map(&:strip) if queues.include?('*') { all: true } elsif exclude_queues { exclude: queues } elsif ordered_queues { include: queues, ordered_queues: true, } else { include: queues } end end |
.queue_string(string) ⇒ ActiveRecord::Relation
Get Jobs on queues that match the given queue string.
181 182 183 184 185 186 187 188 189 190 191 |
# File 'app/models/good_job/job.rb', line 181 scope :queue_string, (lambda do |string| parsed = queue_parser(string) if parsed[:all] all elsif parsed[:exclude] where.not(queue_name: parsed[:exclude]).or where(queue_name: nil) elsif parsed[:include] where(queue_name: parsed[:include]) end end) |
.schedule_ordered ⇒ ActiveRecord::Relation
Order jobs by scheduled or created (oldest first).
169 |
# File 'app/models/good_job/job.rb', line 169 scope :schedule_ordered, -> { order(scheduled_at: :asc) } |
.unfinished ⇒ ActiveRecord::Relation
Get jobs that have not yet finished (succeeded or discarded).
90 |
# File 'app/models/good_job/job.rb', line 90 scope :unfinished, -> { where(finished_at: nil) } |
Instance Method Details
#active_job(ignore_deserialization_errors: false) ⇒ Object
Build an ActiveJob instance and deserialize the arguments, using ‘#active_job_data`.
583 584 585 586 587 588 589 590 591 |
# File 'app/models/good_job/job.rb', line 583 def active_job(ignore_deserialization_errors: false) ActiveJob::Base.deserialize(active_job_data).tap do |aj| aj.send(:deserialize_arguments_if_needed) rescue ActiveJob::DeserializationError raise unless ignore_deserialization_errors end rescue NameError raise unless ignore_deserialization_errors end |
#destroy_job ⇒ void
This method returns an undefined value.
Destroy all of a discarded or finished job’s executions from the database so that it will no longer appear on the dashboard.
570 571 572 573 574 575 576 |
# File 'app/models/good_job/job.rb', line 570 def destroy_job with_advisory_lock do raise ActionForStateMismatchError if finished_at.blank? destroy end end |
#discard_job(message) ⇒ void
This method returns an undefined value.
Discard a job so that it will not be executed further. This action will add a DiscardJobError to the job’s Execution and mark it as finished.
543 544 545 546 547 |
# File 'app/models/good_job/job.rb', line 543 def discard_job() with_advisory_lock do _discard_job() end end |
#discarded? ⇒ Boolean
Tests whether the job has finished but with an error.
488 489 490 |
# File 'app/models/good_job/job.rb', line 488 def discarded? finished? && error.present? end |
#display_error ⇒ String
Errors for the job to be displayed in the Dashboard.
443 444 445 446 447 448 449 |
# File 'app/models/good_job/job.rb', line 443 def display_error return error if error.present? serialized_params.fetch('exception_executions', {}).map do |exception, count| "#{exception}: #{count}" end.join(', ') end |
#display_name ⇒ String
Used when displaying this job in the GoodJob dashboard.
461 462 463 |
# File 'app/models/good_job/job.rb', line 461 def display_name job_class end |
#display_serialized_params ⇒ Hash
Return formatted serialized_params for display in the dashboard
453 454 455 456 457 |
# File 'app/models/good_job/job.rb', line 453 def display_serialized_params serialized_params.merge({ _good_job: attributes.except('serialized_params', 'locktype', 'owns_advisory_lock'), }) end |
#executable? ⇒ Boolean
Tests whether this job is safe to be executed by this thread.
741 742 743 744 745 |
# File 'app/models/good_job/job.rb', line 741 def executable? reload.finished_at.blank? rescue ActiveRecord::RecordNotFound false end |
#executions_count ⇒ Object
465 466 467 |
# File 'app/models/good_job/job.rb', line 465 def executions_count super || 0 end |
#finished? ⇒ Boolean
Tests whether the job has finished (succeeded or discarded).
482 483 484 |
# File 'app/models/good_job/job.rb', line 482 def finished? finished_at.present? end |
#force_discard_job(message) ⇒ Object
Force discard a job so that it will not be executed further. Force discard allows discarding a running job. This action will add a DiscardJobError to the job’s Execution and mark it as finished.
552 553 554 |
# File 'app/models/good_job/job.rb', line 552 def force_discard_job() _discard_job() end |
#job_state ⇒ Object
765 766 767 768 769 |
# File 'app/models/good_job/job.rb', line 765 def job_state state = { queue_name: queue_name } state[:scheduled_at] = scheduled_at if scheduled_at state end |
#number ⇒ Object
747 748 749 |
# File 'app/models/good_job/job.rb', line 747 def number serialized_params.fetch('executions', 0) + 1 end |
#perform(lock_id:) ⇒ ExecutionResult
Execute the ActiveJob job this Execution represents.
598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 |
# File 'app/models/good_job/job.rb', line 598 def perform(lock_id:) run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at job_performed_at = Time.current monotonic_start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.job = self existing_performed_at = performed_at if existing_performed_at current_thread.execution_interrupted = existing_performed_at interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{existing_performed_at}'")) self.error = interrupt_error_string self.error_event = :interrupted monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds execution_attrs = { error: interrupt_error_string, finished_at: job_performed_at, error_event: :interrupted, duration: monotonic_duration, } executions.where(finished_at: nil).where.not(performed_at: nil).update_all(execution_attrs) # rubocop:disable Rails/SkipsModelValidations end transaction do execution_attrs = { job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: scheduled_at || created_at, created_at: job_performed_at, process_id: lock_id, } job_attrs = { performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1), locked_by_id: lock_id, locked_at: Time.current, } execution = executions.create!(execution_attrs) update!(job_attrs) end ActiveSupport::Notifications.instrument("perform_job.good_job", { job: self, execution: execution, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) if value.is_a?(Exception) handled_error = value value = nil end handled_error ||= current_thread.error_on_retry || current_thread.error_on_discard error_event = if !handled_error nil elsif handled_error == current_thread.error_on_discard :discarded elsif handled_error == current_thread.error_on_retry :retried elsif handled_error == current_thread.error_on_retry_stopped :retry_stopped elsif handled_error :handled end instrument_payload.merge!( value: value, error: handled_error, handled_error: handled_error, retried: current_thread.retried_job.present?, error_event: error_event ) ExecutionResult.new(value: value, handled_error: handled_error, error_event: error_event, retried_job: current_thread.retried_job) rescue StandardError => e error_event = if e.is_a?(GoodJob::InterruptError) :interrupted elsif e == current_thread.error_on_retry_stopped :retry_stopped else :unhandled end instrument_payload.merge!( error: e, unhandled_error: e, error_event: error_event ) ExecutionResult.new(value: nil, unhandled_error: e, error_event: error_event) end end job_attributes = { locked_by_id: nil, locked_at: nil } job_error = result.handled_error || result.unhandled_error if job_error error_string = self.class.format_error(job_error) job_attributes[:error] = error_string job_attributes[:error_event] = result.error_event execution.error = error_string execution.error_event = result.error_event execution.error_backtrace = job_error.backtrace else job_attributes[:error] = nil job_attributes[:error_event] = nil end job_finished_at = Time.current monotonic_duration = (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - monotonic_start).seconds job_attributes[:finished_at] = job_finished_at execution.finished_at = job_finished_at execution.duration = monotonic_duration retry_unhandled_error = result.unhandled_error && GoodJob.retry_on_unhandled_error reenqueued = result.retried? || retry_unhandled_error if reenqueued job_attributes[:performed_at] = nil job_attributes[:finished_at] = nil end assign_attributes(job_attributes) preserve_unhandled = result.unhandled_error && (GoodJob.retry_on_unhandled_error || GoodJob.preserve_job_records == :on_unhandled_error) if finished_at.blank? || GoodJob.preserve_job_records == true || reenqueued || preserve_unhandled || cron_key.present? transaction do execution.save! save! end else destroy! end result end end |
#queue_latency ⇒ Object
Time between when this job was expected to run and when it started running
752 753 754 755 756 757 758 |
# File 'app/models/good_job/job.rb', line 752 def queue_latency now = Time.zone.now expected_start = scheduled_at || created_at actual_start = performed_at || finished_at || now actual_start - expected_start unless expected_start >= now end |
#recent_error ⇒ String
The most recent error message. If the job has been retried, the error will be fetched from the previous Execution record.
432 433 434 435 436 437 438 439 |
# File 'app/models/good_job/job.rb', line 432 def recent_error GoodJob.deprecator.warn(<<~DEPRECATION) The `GoodJob::Job#recent_error` method is deprecated and will be removed in the next major release. Replace usage of GoodJob::Job#recent_error with `GoodJob::Job#error`. DEPRECATION error end |
#reschedule_job(scheduled_at = Time.current) ⇒ void
This method returns an undefined value.
Reschedule a scheduled job so that it executes immediately (or later) by the next available execution thread.
559 560 561 562 563 564 565 566 |
# File 'app/models/good_job/job.rb', line 559 def reschedule_job(scheduled_at = Time.current) with_advisory_lock do reload raise ActionForStateMismatchError if finished_at.present? update(scheduled_at: scheduled_at) end end |
#retry_job ⇒ ActiveJob::Base
Retry a job that has errored and been discarded. This action will create a new Execution record for the job.
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 |
# File 'app/models/good_job/job.rb', line 501 def retry_job with_advisory_lock do reload active_job = self.active_job(ignore_deserialization_errors: true) raise ActiveJobDeserializationError if active_job.nil? raise AdapterNotGoodJobError unless active_job.class.queue_adapter.is_a? GoodJob::Adapter raise ActionForStateMismatchError if finished_at.blank? || error.blank? # Update the executions count because the previous execution will not have been preserved # Do not update `exception_executions` because that comes from rescue_from's arguments active_job.executions = (active_job.executions || 0) + 1 begin error_class, = error.split(ERROR_MESSAGE_SEPARATOR).map(&:strip) error = error_class.constantize.new() rescue StandardError error = StandardError.new(error) end new_active_job = nil GoodJob::CurrentThread.within do |current_thread| current_thread.job = self current_thread.retry_now = true transaction do # NOTE: Required until fixed in rails https://github.com/rails/rails/pull/52121 I18n.with_locale(active_job.locale) do new_active_job = active_job.retry_job(wait: 0, error: error) end self.error_event = :retried if error save! end end new_active_job end end |
#running? ⇒ Boolean
Tests whether the job is being executed right now.
471 472 473 474 475 476 477 478 |
# File 'app/models/good_job/job.rb', line 471 def running? # Avoid N+1 Query: `.includes_advisory_locks` if has_attribute?(:locktype) self['locktype'].present? else advisory_locked? end end |
#runtime_latency ⇒ Object
Time between when this job started and finished
761 762 763 |
# File 'app/models/good_job/job.rb', line 761 def runtime_latency (finished_at || Time.zone.now) - performed_at if performed_at end |
#succeeded? ⇒ Boolean
Tests whether the job has finished without error
494 495 496 |
# File 'app/models/good_job/job.rb', line 494 def succeeded? finished? && !discarded? end |