Module: RocketJob::Plugins::Job::Worker::ClassMethods
- Defined in:
- lib/rocket_job/plugins/job/worker.rb
Instance Method Summary collapse
-
#perform_later(args, &block) ⇒ Object
DEPRECATED.
-
#perform_now(args, &block) ⇒ Object
Run this job now.
-
#requeue_dead_server(server_name) ⇒ Object
Requeues all jobs that were running on a server that died.
-
#rocket_job_next_job(worker_name, filter = {}) ⇒ Object
Returns the next job to work on in priority based order Returns nil if there are currently no queued jobs, or processing batch jobs with records that require processing.
Instance Method Details
#perform_later(args, &block) ⇒ Object
DEPRECATED
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 71 def perform_later(args, &block) if RocketJob::Config.inline_mode perform_now(args, &block) else job = new(args) block.call(job) if block job.save! job end end |
#perform_now(args, &block) ⇒ Object
Run this job now.
The job is not saved to the database since it is processed entriely in memory As a result before_save and before_destroy callbacks will not be called. Validations are still called however prior to calling #perform
Note:
-
Only batch throttles are checked when perform_now is called.
19 20 21 22 23 24 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 19 def perform_now(args, &block) job = new(args) block.call(job) if block job.perform_now job end |
#requeue_dead_server(server_name) ⇒ Object
Requeues all jobs that were running on a server that died
63 64 65 66 67 68 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 63 def requeue_dead_server(server_name) # Need to requeue paused, failed since user may have transitioned job before it finished where(:state.in => [:running, :paused, :failed]).each do |job| job.requeue!(server_name) if job.may_requeue?(server_name) end end |
#rocket_job_next_job(worker_name, filter = {}) ⇒ Object
Returns the next job to work on in priority based order Returns nil if there are currently no queued jobs, or processing batch jobs
with records that require processing
Parameters
worker_name [String]
Name of the worker that will be processing this job
skip_job_ids [Array<BSON::ObjectId>]
Job ids to exclude when looking for the next job
Note:
If a job is in queued state it will be started
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 39 def rocket_job_next_job(worker_name, filter = {}) while (job = rocket_job_retrieve(worker_name, filter)) case when job.running? # Batch Job return job when job.expired? job.rocket_job_fail_on_exception!(worker_name) { job.destroy } logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" when new_filter = job.send(:rocket_job_evaluate_throttles) rocket_job_merge_filter(filter, new_filter) # Restore retrieved job so that other workers can process it later job.set(worker_name: nil, state: :queued) else job.worker_name = worker_name job.rocket_job_fail_on_exception!(worker_name) do defined?(RocketJobPro) ? job.start! : job.start end return job if job.running? end end end |