Module: RocketJob::Plugins::Job::Worker

Extended by:
ActiveSupport::Concern
Included in:
Job
Defined in:
lib/rocket_job/plugins/job/worker.rb

Defined Under Namespace

Modules: ClassMethods

Instance Method Summary collapse

Instance Method Details

#performObject

Raises:

  • (NotImplementedError)


118
119
120
# File 'lib/rocket_job/plugins/job/worker.rb', line 118

def perform(*)
  raise NotImplementedError
end

#perform_nowObject

Runs the job now in the current thread.

Validations are called prior to running the job.

The job is not saved and therefore the following callbacks are not called:

  • before_save

  • after_save

  • before_create

  • after_create

Exceptions are not suppressed and should be handled by the caller.

Raises:

  • (Mongoid::Errors::Validations)


108
109
110
111
112
113
114
115
116
# File 'lib/rocket_job/plugins/job/worker.rb', line 108

def perform_now
  raise(Mongoid::Errors::Validations, self) unless valid?

  worker = RocketJob::Worker.new(inline: true)
  start if may_start?
  # Re-Raise exceptions
  rocket_job_work(worker, true) if running?
  result
end

#rocket_job_active_workers(server_name = nil) ⇒ Object

Returns [Hash<String:>] All servers actively working on this job



185
186
187
188
# File 'lib/rocket_job/plugins/job/worker.rb', line 185

def rocket_job_active_workers(server_name = nil)
  return [] if !running? || (server_name && !worker_on_server?(server_name))
  [ActiveWorker.new(worker_name, started_at, self)]
end

#rocket_job_fail_on_exception!(worker_name, re_raise_exceptions = false) ⇒ Object

Fail this job in the event of an exception.

The job is automatically saved only if an exception is raised in the supplied block.

worker_name: [String]

Name of the server on which the exception has occurred

re_raise_exceptions: [true|false]

Re-raise the exception after updating the job
Default: false


132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/rocket_job/plugins/job/worker.rb', line 132

def rocket_job_fail_on_exception!(worker_name, re_raise_exceptions = false)
  yield
rescue Exception => exc
  if failed? || !may_fail?
    self.exception        = JobException.from_exception(exc)
    exception.worker_name = worker_name
    save! unless new_record? || destroyed?
  else
    if new_record? || destroyed?
      fail(worker_name, exc)
    else
      fail!(worker_name, exc)
    end
  end
  raise exc if re_raise_exceptions
end

#rocket_job_work(worker, re_raise_exceptions = false, filter = nil) ⇒ Object

Works on this job

Returns [true|false] whether this job should be excluded from the next lookup

If an exception is thrown the job is marked as failed and the exception is set in the job itself.

Thread-safe, can be called by multiple threads at the same time

Raises:

  • (ArgumentError)


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/rocket_job/plugins/job/worker.rb', line 157

def rocket_job_work(worker, re_raise_exceptions = false, filter = nil)
  raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running?
  rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do
    if _perform_callbacks.empty?
      @rocket_job_output = perform
    else
      # Allows @rocket_job_output to be modified by after/around callbacks
      run_callbacks(:perform) do
        # Allow callbacks to fail, complete or abort the job
        @rocket_job_output = perform if running?
      end
    end

    if collect_output?
      # Result must be a Hash, if not put it in a Hash
      self.result = @rocket_job_output.is_a?(Hash) ? @rocket_job_output : {'result' => @rocket_job_output}
    end

    if new_record? || destroyed?
      complete if may_complete?
    else
      may_complete? ? complete! : save!
    end
  end
  false
end