Module: RocketJob::Plugins::Job::Worker

Extended by:
ActiveSupport::Concern
Included in:
Job
Defined in:
lib/rocket_job/plugins/job/worker.rb

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#fail_on_exception!(re_raise_exceptions = false, &block) ⇒ Object

Fail this job in the event of an exception.

The job is automatically saved only if an exception is raised in the supplied block.

re_raise_exceptions: [true|false]

Re-raise the exception after updating the job
Default: false


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/rocket_job/plugins/job/worker.rb', line 69

def fail_on_exception!(re_raise_exceptions = false, &block)
  SemanticLogger.named_tagged(job: id.to_s, &block)
rescue Exception => e
  SemanticLogger.named_tagged(job: id.to_s) do
    if failed? || !may_fail?
      self.exception        = JobException.from_exception(e)
      exception.worker_name = worker_name
    else
      fail(worker_name, e)
    end

    # Prevent validation failures from failing the job
    save(validate: false) unless new_record? || destroyed?

    raise e if re_raise_exceptions
  end
end

#performObject

Raises:

  • (NotImplementedError)


58
59
60
# File 'lib/rocket_job/plugins/job/worker.rb', line 58

def perform(*)
  raise NotImplementedError
end

#perform_nowObject

Runs the job now in the current thread.

Validations are called prior to running the job.

The job is not saved and therefore the following callbacks are not called:

  • before_save

  • after_save

  • before_create

  • after_create

Exceptions are not suppressed and should be handled by the caller.

Raises:

  • (::Mongoid::Errors::Validations)


48
49
50
51
52
53
54
55
56
# File 'lib/rocket_job/plugins/job/worker.rb', line 48

def perform_now
  raise(::Mongoid::Errors::Validations, self) unless valid?

  worker = RocketJob::Worker.new
  start if may_start?
  # Re-Raise exceptions
  rocket_job_work(worker, true) if running?
  @rocket_job_output
end

#rocket_job_active_workers(server_name = nil) ⇒ Object

Returns [Hash<String:>] All servers actively working on this job



119
120
121
122
123
# File 'lib/rocket_job/plugins/job/worker.rb', line 119

def rocket_job_active_workers(server_name = nil)
  return [] if !running? || (server_name && !worker_on_server?(server_name))

  [ActiveWorker.new(worker_name, started_at, self)]
end

#rocket_job_work(_worker, re_raise_exceptions = false) ⇒ Object

Works on this job

Returns [true|false] whether any work was performed.

If an exception is thrown the job is marked as failed and the exception is set in the job itself.

Thread-safe, can be called by multiple threads at the same time

Raises:

  • (ArgumentError)


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/rocket_job/plugins/job/worker.rb', line 95

def rocket_job_work(_worker, re_raise_exceptions = false)
  raise(ArgumentError, "Job must be started before calling #rocket_job_work") unless running?

  fail_on_exception!(re_raise_exceptions) do
    if _perform_callbacks.empty?
      @rocket_job_output = perform
    else
      # Allows @rocket_job_output to be modified by after/around callbacks
      run_callbacks(:perform) do
        # Allow callbacks to fail, complete or abort the job
        @rocket_job_output = perform if running?
      end
    end

    if new_record? || destroyed?
      complete if may_complete?
    else
      may_complete? ? complete! : save!
    end
  end
  false
end