Module: RocketJob::Plugins::Job::Worker
- Extended by:
- ActiveSupport::Concern
- Included in:
- Job
- Defined in:
- lib/rocket_job/plugins/job/worker.rb
Defined Under Namespace
Modules: ClassMethods
Instance Method Summary collapse
- #perform ⇒ Object
-
#perform_now ⇒ Object
Runs the job now in the current thread.
-
#rocket_job_active_workers(server_name = nil) ⇒ Object
Returns [Hash<String:>] All servers actively working on this job.
-
#rocket_job_fail_on_exception!(worker_name, re_raise_exceptions = false) ⇒ Object
Fail this job in the event of an exception.
-
#rocket_job_work(worker, re_raise_exceptions = false, filter = nil) ⇒ Object
Works on this job.
Instance Method Details
#perform ⇒ Object
118 119 120 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 118 def perform(*) raise NotImplementedError end |
#perform_now ⇒ Object
Runs the job now in the current thread.
Validations are called prior to running the job.
The job is not saved and therefore the following callbacks are not called:
-
before_save
-
after_save
-
before_create
-
after_create
Exceptions are not suppressed and should be handled by the caller.
108 109 110 111 112 113 114 115 116 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 108 def perform_now raise(Mongoid::Errors::Validations, self) unless valid? worker = RocketJob::Worker.new(inline: true) start if may_start? # Re-Raise exceptions rocket_job_work(worker, true) if running? result end |
#rocket_job_active_workers(server_name = nil) ⇒ Object
Returns [Hash<String:>] All servers actively working on this job
185 186 187 188 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 185 def rocket_job_active_workers(server_name = nil) return [] if !running? || (server_name && !worker_on_server?(server_name)) [ActiveWorker.new(worker_name, started_at, self)] end |
#rocket_job_fail_on_exception!(worker_name, re_raise_exceptions = false) ⇒ Object
Fail this job in the event of an exception.
The job is automatically saved only if an exception is raised in the supplied block.
worker_name: [String]
Name of the server on which the exception has occurred
re_raise_exceptions: [true|false]
Re-raise the exception after updating the job
Default: false
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 132 def rocket_job_fail_on_exception!(worker_name, re_raise_exceptions = false) yield rescue Exception => exc if failed? || !may_fail? self.exception = JobException.from_exception(exc) exception.worker_name = worker_name save! unless new_record? || destroyed? else if new_record? || destroyed? fail(worker_name, exc) else fail!(worker_name, exc) end end raise exc if re_raise_exceptions end |
#rocket_job_work(worker, re_raise_exceptions = false, filter = nil) ⇒ Object
Works on this job
Returns [true|false] whether this job should be excluded from the next lookup
If an exception is thrown the job is marked as failed and the exception is set in the job itself.
Thread-safe, can be called by multiple threads at the same time
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 157 def rocket_job_work(worker, re_raise_exceptions = false, filter = nil) raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running? rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do if _perform_callbacks.empty? @rocket_job_output = perform else # Allows @rocket_job_output to be modified by after/around callbacks run_callbacks(:perform) do # Allow callbacks to fail, complete or abort the job @rocket_job_output = perform if running? end end if collect_output? # Result must be a Hash, if not put it in a Hash self.result = @rocket_job_output.is_a?(Hash) ? @rocket_job_output : {'result' => @rocket_job_output} end if new_record? || destroyed? complete if may_complete? else may_complete? ? complete! : save! end end false end |