Class: Delayed::JobGroups::JobGroup
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Delayed::JobGroups::JobGroup
- Defined in:
- lib/delayed/job_groups/job_group.rb
Class Method Summary collapse
- .check_for_completion(job_group_id, skip_pending_jobs_check: false) ⇒ Object
-
.has_pending_jobs?(job_group_ids) ⇒ Boolean
rubocop:disable Naming/PredicateName.
Instance Method Summary collapse
- #cancel ⇒ Object
- #check_for_completion(skip_pending_jobs_check: false) ⇒ Object
- #enqueue(job, options = {}) ⇒ Object
- #mark_queueing_complete ⇒ Object
- #unblock ⇒ Object
Class Method Details
.check_for_completion(job_group_id, skip_pending_jobs_check: false) ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/delayed/job_groups/job_group.rb', line 73 def self.check_for_completion(job_group_id, skip_pending_jobs_check: false) # Optimization to avoid loading and locking the JobGroup when the group # still has pending jobs return if !skip_pending_jobs_check && has_pending_jobs?(job_group_id) transaction do # The first completed job to notice the job group's queue count has dropped to # zero will queue the job group's completion job and destroy the job group so # other jobs need to handle the job group having been destroyed already. job_group = where(id: job_group_id).lock(true).first job_group.send(:complete) if job_group&.send(:ready_for_completion?) end end |
.has_pending_jobs?(job_group_ids) ⇒ Boolean
rubocop:disable Naming/PredicateName
87 88 89 90 91 92 |
# File 'lib/delayed/job_groups/job_group.rb', line 87 def self.has_pending_jobs?(job_group_ids) # rubocop:disable Naming/PredicateName job_group_ids = Array(job_group_ids) return false if job_group_ids.empty? Delayed::Job.where(job_group_id: job_group_ids, failed_at: nil).exists? end |
Instance Method Details
#cancel ⇒ Object
64 65 66 67 |
# File 'lib/delayed/job_groups/job_group.rb', line 64 def cancel Delayed::Job.enqueue(on_cancellation_job, || {}) if on_cancellation_job destroy end |
#check_for_completion(skip_pending_jobs_check: false) ⇒ Object
69 70 71 |
# File 'lib/delayed/job_groups/job_group.rb', line 69 def check_for_completion(skip_pending_jobs_check: false) self.class.check_for_completion(id, skip_pending_jobs_check: skip_pending_jobs_check) end |
#enqueue(job, options = {}) ⇒ Object
48 49 50 51 52 |
# File 'lib/delayed/job_groups/job_group.rb', line 48 def enqueue(job, = {}) = .merge(job_group_id: id) [:blocked] = blocked? Delayed::Job.enqueue(job, ) end |
#mark_queueing_complete ⇒ Object
39 40 41 42 43 44 45 46 |
# File 'lib/delayed/job_groups/job_group.rb', line 39 def mark_queueing_complete with_lock do raise 'JobGroup has already completed queueing' if queueing_complete? update_column(:queueing_complete, true) complete if ready_for_completion? end end |
#unblock ⇒ Object
54 55 56 57 58 59 60 61 62 |
# File 'lib/delayed/job_groups/job_group.rb', line 54 def unblock return unless blocked? with_lock do update_column(:blocked, false) active_jobs.update_all(blocked: false, run_at: Delayed::Job.db_time_now) complete if ready_for_completion? end end |