Module: RocketJob::Batch::Worker
- Extended by:
- ActiveSupport::Concern
- Included in:
- RocketJob::Batch
- Defined in:
- lib/rocket_job/batch/worker.rb
Instance Method Summary collapse
-
#rocket_job_active_workers(server_name = nil) ⇒ Object
Returns [Array<ActiveWorker>] All workers actively working on this job.
-
#rocket_job_batch_callbacks(worker) ⇒ Object
Run Batch before and after callbacks.
-
#rocket_job_batch_complete?(worker_name) ⇒ Boolean
Checks for completion and runs after_batch if defined Returns true if the job is now complete/aborted/failed.
-
#rocket_job_batch_fail!(worker_name) ⇒ Object
Fail the job.
-
#rocket_job_batch_perform(slice, record) ⇒ Object
Perform a single record within the current slice.
-
#rocket_job_batch_run_after_callbacks(save_before = true) ⇒ Object
Run the after_batch callbacks Saves the current state before and after running callbacks if callbacks present.
-
#rocket_job_batch_run_before_callbacks ⇒ Object
Run the before_batch callbacks Saves the current state before and after running callbacks if callbacks present.
- #rocket_job_batch_throttled?(slice, worker) ⇒ Boolean
-
#rocket_job_perform_slice(slice, &block) ⇒ Object
Perform individual slice without callbacks.
-
#rocket_job_process_slice(slice, &block) ⇒ Object
Process a single slice from Mongo Once the slice has been successfully processed it will be removed from the input collection Returns [Integer] the number of records successfully processed.
-
#rocket_job_work(worker, re_raise_exceptions = false) ⇒ Object
Processes records in each available slice for this job.
-
#work_first_slice(&block) ⇒ Object
deprecated
Deprecated.
Please open a ticket if you need this behavior.
Instance Method Details
#rocket_job_active_workers(server_name = nil) ⇒ Object
Returns [Array<ActiveWorker>] All workers actively working on this job
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/rocket_job/batch/worker.rb', line 96 def rocket_job_active_workers(server_name = nil) servers = [] case sub_state when :before, :after if running? && (server_name.nil? || worker_on_server?(server_name)) servers << ActiveWorker.new(worker_name, started_at, self) end when :processing query = input.running query = query.where(worker_name: /\A#{server_name}/) if server_name query.each do |slice| servers << ActiveWorker.new(slice.worker_name, slice.started_at, self) end end servers end |
#rocket_job_batch_callbacks(worker) ⇒ Object
Run Batch before and after callbacks
310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/rocket_job/batch/worker.rb', line 310 def rocket_job_batch_callbacks(worker) # If this is the first worker to pickup this job case sub_state when :before rocket_job_batch_run_before_callbacks # Check for 0 record jobs rocket_job_batch_complete?(worker.name) if running? when sub_state == :after rocket_job_batch_run_after_callbacks end end |
#rocket_job_batch_complete?(worker_name) ⇒ Boolean
Checks for completion and runs after_batch if defined Returns true if the job is now complete/aborted/failed
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/rocket_job/batch/worker.rb', line 201 def rocket_job_batch_complete?(worker_name) return true unless running? return false unless record_count # Only failed slices left? input_count = input.count failed_count = input.failed.count if failed_count.positive? && (input_count == failed_count) # Reload to pull in any counters or other data that was modified. reload unless new_record? rocket_job_batch_fail!(worker_name) if may_fail? return true end # Any work left? return false if input_count.positive? # If the job was not saved to the queue, do not save any changes if new_record? rocket_job_batch_run_after_callbacks(false) return true end # Complete job iff no other worker has already completed it # Must set write concern to at least 1 since we need the nModified back result = self.class.with(write: {w: 1}) do |query| query. where(id: id, state: :running, sub_state: :processing). update("$set" => {sub_state: "after", worker_name: worker_name}) end # Reload to pull in any counters or other data that was modified. reload if result.modified_count.positive? rocket_job_batch_run_after_callbacks(false) elsif aborted? # Repeat cleanup in case this worker was still running when the job was aborted cleanup! end true end |
#rocket_job_batch_fail!(worker_name) ⇒ Object
Fail the job
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/rocket_job/batch/worker.rb', line 245 def rocket_job_batch_fail!(worker_name) fail_job = true unless new_record? # Fail job iff no other worker has already finished it # Must set write concern to at least 1 since we need the nModified back result = self.class.with(write: {w: 1}) do |query| query. where(id: id, state: :running, sub_state: :processing). update({"$set" => {state: "failed", worker_name: worker_name}}) end fail_job = false unless result.modified_count.positive? end return unless fail_job = "#{input.failed.count} slices failed to process" self.exception = JobException.new(message: ) new_record? ? fail(worker_name, ) : fail!(worker_name, ) end |
#rocket_job_batch_perform(slice, record) ⇒ Object
Perform a single record within the current slice.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/rocket_job/batch/worker.rb', line 175 def rocket_job_batch_perform(slice, record) @rocket_job_record_number = slice.current_record_number return block_given? ? yield(record) : perform(record) if _perform_callbacks.empty? # @rocket_job_input and @rocket_job_output can be modified by before/around callbacks @rocket_job_input = record @rocket_job_output = nil run_callbacks(:perform) do @rocket_job_output = if block_given? yield(@rocket_job_input) else perform(@rocket_job_input) end end @rocket_job_input = nil result = @rocket_job_output @rocket_job_output = nil result end |
#rocket_job_batch_run_after_callbacks(save_before = true) ⇒ Object
Run the after_batch callbacks Saves the current state before and after running callbacks if callbacks present
288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
# File 'lib/rocket_job/batch/worker.rb', line 288 def rocket_job_batch_run_after_callbacks(save_before = true) unless _after_batch_callbacks.empty? self.sub_state = :after save! if save_before && !new_record? && !destroyed? logger.measure_info( "after_batch", metric: "#{self.class.name}/after_batch", log_exception: :full, on_exception_level: :error, silence: log_level ) do run_callbacks(:after_batch) end end if new_record? || destroyed? complete if may_complete? else may_complete? ? complete! : save! end end |
#rocket_job_batch_run_before_callbacks ⇒ Object
Run the before_batch callbacks Saves the current state before and after running callbacks if callbacks present
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/rocket_job/batch/worker.rb', line 268 def rocket_job_batch_run_before_callbacks unless _before_batch_callbacks.empty? self.sub_state = :before save! unless new_record? || destroyed? logger.measure_info( "before_batch", metric: "#{self.class.name}/before_batch", log_exception: :full, on_exception_level: :error, silence: log_level ) do run_callbacks(:before_batch) end end self.sub_state = :processing save! unless new_record? || destroyed? end |
#rocket_job_batch_throttled?(slice, worker) ⇒ Boolean
113 114 115 116 117 118 119 120 121 |
# File 'lib/rocket_job/batch/worker.rb', line 113 def rocket_job_batch_throttled?(slice, worker) filter = self.class.rocket_job_batch_throttles.matching_filter(self, slice) return false unless filter # Restore retrieved slice so that other workers can process it later. slice.set(worker_name: nil, state: :queued, started_at: nil) worker.add_to_current_filter(filter) true end |
#rocket_job_perform_slice(slice, &block) ⇒ Object
Perform individual slice without callbacks
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/rocket_job/batch/worker.rb', line 145 def rocket_job_perform_slice(slice, &block) slice.processing_record_number ||= 0 append = false # Skip processed records in this slice if it has no output categories. records = if slice.processing_record_number.to_i > 1 append = true logger.info("Resuming previously incomplete slice from record number #{slice.processing_record_number}") slice.records[slice.processing_record_number - 1..-1] else # Reprocess all records in this slice. slice.processing_record_number = 0 slice.records end count = 0 RocketJob::Sliced::Writer::Output.collect(self, input_slice: slice, append: append) do |writer| records.each do |record| slice.processing_record_number += 1 SemanticLogger.named_tagged(record: slice.current_record_number) do writer << rocket_job_batch_perform(slice, record, &block) count += 1 end end end count end |
#rocket_job_process_slice(slice, &block) ⇒ Object
Process a single slice from Mongo Once the slice has been successfully processed it will be removed from the input collection Returns [Integer] the number of records successfully processed
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/rocket_job/batch/worker.rb', line 126 def rocket_job_process_slice(slice, &block) @rocket_job_slice = slice count = 0 run_callbacks(:slice) do # Allow before_slice callbacks to fail, complete or abort this slice. return 0 unless running? count = rocket_job_perform_slice(slice, &block) end @rocket_job_slice = nil # On successful completion remove the slice from the input queue # TODO: Add option to complete slice instead of destroying it to retain input data. slice.destroy count end |
#rocket_job_work(worker, re_raise_exceptions = false) ⇒ Object
Processes records in each available slice for this job. Slices are processed one at a time to allow for concurrent calls to this method to increase throughput. Processing will continue until there are no more slices available for this job.
Returns [true|false] whether any work was performed.
Slices are destroyed after their records are successfully processed
If an exception was thrown the entire slice of records is marked as failed.
Thread-safe, can be called by multiple threads at the same time
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/rocket_job/batch/worker.rb', line 29 def rocket_job_work(worker, re_raise_exceptions = false) raise "Job must be started before calling #rocket_job_work" unless running? start_time = Time.now if sub_state != :processing fail_on_exception!(re_raise_exceptions) { rocket_job_batch_callbacks(worker) } return false unless running? end SemanticLogger.named_tagged(job: id.to_s) do until worker.shutdown? slice = input.next_slice(worker.name) if slice # Grab a slice before checking the throttle to reduce concurrency race condition. return true if slice.fail_on_exception!(re_raise_exceptions) { rocket_job_batch_throttled?(slice, worker) } next if slice.failed? slice.fail_on_exception!(re_raise_exceptions) { rocket_job_process_slice(slice) } elsif record_count && fail_on_exception!(re_raise_exceptions) { rocket_job_batch_complete?(worker.name) } return false else logger.debug "No more work available for this job" worker.add_to_current_filter(throttle_filter_id) return true end # Allow new jobs with a higher priority to interrupt this job break if (Time.now - start_time) >= Config.re_check_seconds end end false end |
#work_first_slice(&block) ⇒ Object
Please open a ticket if you need this behavior.
Prior to a job being made available for processing it can be processed one slice at a time.
For example, to extract the header row which would be in the first slice.
Returns [Integer] the number of records processed in the slice
Note: The slice will be removed from processing when this method completes
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/rocket_job/batch/worker.rb', line 72 def work_first_slice(&block) raise "#work_first_slice can only be called from within before_batch callbacks" unless sub_state == :before # TODO: Make these settings configurable count = 0 wait_seconds = 5 while input.first.nil? break if count > 10 logger.info "First slice has not arrived yet, sleeping for #{wait_seconds} seconds" sleep wait_seconds count += 1 end slice = input.first # No records processed return 0 unless slice # TODO: Persist that the first slice is being processed by this worker slice.start rocket_job_process_slice(slice, &block) end |