Module: RocketJob::Plugins::Job::Worker::ClassMethods

Defined in:
lib/rocket_job/plugins/job/worker.rb

Instance Method Summary collapse

Instance Method Details

#perform_later(args, &block) ⇒ Object

DEPRECATED



71
72
73
74
75
76
77
78
79
80
# File 'lib/rocket_job/plugins/job/worker.rb', line 71

def perform_later(args, &block)
  if RocketJob::Config.inline_mode
    perform_now(args, &block)
  else
    job = new(args)
    block.call(job) if block
    job.save!
    job
  end
end

#perform_now(args, &block) ⇒ Object

Run this job now.

The job is not saved to the database since it is processed entriely in memory As a result before_save and before_destroy callbacks will not be called. Validations are still called however prior to calling #perform

Note:

  • Only batch throttles are checked when perform_now is called.



19
20
21
22
23
24
# File 'lib/rocket_job/plugins/job/worker.rb', line 19

def perform_now(args, &block)
  job = new(args)
  block.call(job) if block
  job.perform_now
  job
end

#requeue_dead_server(server_name) ⇒ Object

Requeues all jobs that were running on a server that died



63
64
65
66
67
68
# File 'lib/rocket_job/plugins/job/worker.rb', line 63

def requeue_dead_server(server_name)
  # Need to requeue paused, failed since user may have transitioned job before it finished
  where(:state.in => [:running, :paused, :failed]).each do |job|
    job.requeue!(server_name) if job.may_requeue?(server_name)
  end
end

#rocket_job_next_job(worker_name, filter = {}) ⇒ Object

Returns the next job to work on in priority based order Returns nil if there are currently no queued jobs, or processing batch jobs

with records that require processing

Parameters

worker_name [String]
  Name of the worker that will be processing this job

skip_job_ids [Array<BSON::ObjectId>]
  Job ids to exclude when looking for the next job

Note:

If a job is in queued state it will be started


39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rocket_job/plugins/job/worker.rb', line 39

def rocket_job_next_job(worker_name, filter = {})
  while (job = rocket_job_retrieve(worker_name, filter))
    case
    when job.running?
      # Batch Job
      return job
    when job.expired?
      job.rocket_job_fail_on_exception!(worker_name) { job.destroy }
      logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}"
    when new_filter = job.send(:rocket_job_evaluate_throttles)
      rocket_job_merge_filter(filter, new_filter)
      # Restore retrieved job so that other workers can process it later
      job.set(worker_name: nil, state: :queued)
    else
      job.worker_name = worker_name
      job.rocket_job_fail_on_exception!(worker_name) do
        defined?(RocketJobPro) ? job.start! : job.start
      end
      return job if job.running?
    end
  end
end