Class: GoodJob::Batch
- Inherits:
-
Object
- Object
- GoodJob::Batch
- Includes:
- GlobalID::Identification
- Defined in:
- app/models/good_job/batch.rb
Overview
NOTE: This class delegates to BatchRecord and is intended to be the public interface for Batches.
Constant Summary collapse
- PROTECTED_PROPERTIES =
%i[ on_finish on_success on_discard callback_queue_name callback_priority description properties ].freeze
Class Method Summary collapse
-
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it.
- .find(id) ⇒ Object
- .primary_key ⇒ Object
-
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch.
Instance Method Summary collapse
- #_record ⇒ Object
- #active_jobs ⇒ Object
-
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch.
- #assign_properties(properties) ⇒ Object
- #callback_active_jobs ⇒ Object
-
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Active jobs added to the batch.
-
#initialize(_record: nil, **properties) ⇒ Batch
constructor
rubocop:disable Lint/UnderscorePrefixedVariableName.
- #retry ⇒ Object
Constructor Details
#initialize(_record: nil, **properties) ⇒ Batch
rubocop:disable Lint/UnderscorePrefixedVariableName
86 87 88 89 |
# File 'app/models/good_job/batch.rb', line 86 def initialize(_record: nil, **properties) # rubocop:disable Lint/UnderscorePrefixedVariableName self.record = _record || BatchRecord.new assign_properties(properties) end |
Class Method Details
.enqueue(active_jobs = [], **properties, &block) ⇒ GoodJob::BatchRecord
Create a new batch and enqueue it
58 59 60 61 62 |
# File 'app/models/good_job/batch.rb', line 58 def self.enqueue(active_jobs = [], **properties, &block) new.tap do |batch| batch.enqueue(active_jobs, **properties, &block) end end |
.find(id) ⇒ Object
68 69 70 |
# File 'app/models/good_job/batch.rb', line 68 def self.find(id) new _record: BatchRecord.find(id) end |
.primary_key ⇒ Object
64 65 66 |
# File 'app/models/good_job/batch.rb', line 64 def self.primary_key :id end |
.within_thread(batch_id: nil, batch_callback_id: nil) ⇒ Object
Helper method to enqueue jobs and assign them to a batch
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'app/models/good_job/batch.rb', line 73 def self.within_thread(batch_id: nil, batch_callback_id: nil) original_batch_id = current_batch_id original_batch_callback_id = current_batch_callback_id self.current_batch_id = batch_id self.current_batch_callback_id = batch_callback_id yield ensure self.current_batch_id = original_batch_id self.current_batch_callback_id = original_batch_callback_id end |
Instance Method Details
#_record ⇒ Object
178 179 180 |
# File 'app/models/good_job/batch.rb', line 178 def _record record end |
#active_jobs ⇒ Object
163 164 165 |
# File 'app/models/good_job/batch.rb', line 163 def active_jobs record.jobs.map(&:active_job) end |
#add(active_jobs = nil, &block) ⇒ Array<ActiveJob::Base>
Enqueue jobs and add them to the batch
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'app/models/good_job/batch.rb', line 132 def add(active_jobs = nil, &block) record.save if record.new_record? buffer = Bulk::Buffer.new buffer.add(active_jobs) buffer.capture(&block) if block self.class.within_thread(batch_id: id) do buffer.enqueue end buffer.active_jobs end |
#assign_properties(properties) ⇒ Object
171 172 173 174 175 176 |
# File 'app/models/good_job/batch.rb', line 171 def assign_properties(properties) properties = properties.dup batch_attrs = PROTECTED_PROPERTIES.index_with { |key| properties.delete(key) }.compact record.assign_attributes(batch_attrs) self.properties.merge!(properties) end |
#callback_active_jobs ⇒ Object
167 168 169 |
# File 'app/models/good_job/batch.rb', line 167 def callback_active_jobs record.callback_jobs.map(&:active_job) end |
#enqueue(active_jobs = [], **properties, &block) ⇒ Array<ActiveJob::Base>
Returns Active jobs added to the batch.
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'app/models/good_job/batch.rb', line 92 def enqueue(active_jobs = [], **properties, &block) assign_properties(properties) if record.new_record? record.save! else record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.enqueued_at_will_change! record.jobs_finished_at_will_change! if GoodJob::BatchRecord.jobs_finished_at_migrated? record.finished_at_will_change! update_attributes = { discarded_at: nil, finished_at: nil } update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated? record.update!(**update_attributes) end end end active_jobs = add(active_jobs, &block) Rails.application.executor.wrap do buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.update!(enqueued_at: Time.current) # During inline execution, this could enqueue and execute further jobs record._continue_discard_or_finish(lock: false) end end end buffer.call end active_jobs end |
#retry ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'app/models/good_job/batch.rb', line 146 def retry Rails.application.executor.wrap do buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do update_attributes = { discarded_at: nil, finished_at: nil } update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated? record.update!(update_attributes) record.jobs.discarded.each(&:retry_job) record._continue_discard_or_finish(lock: false) end end end buffer.call end end |