Module: RocketJob::Plugins::Job::Worker
- Extended by:
- ActiveSupport::Concern
- Included in:
- Job
- Defined in:
- lib/rocket_job/plugins/job/worker.rb
Defined Under Namespace
Modules: ClassMethods
Instance Method Summary collapse
-
#fail_on_exception!(re_raise_exceptions = false, &block) ⇒ Object
Fail this job in the event of an exception.
- #perform ⇒ Object
-
#perform_now ⇒ Object
Runs the job now in the current thread.
-
#rocket_job_active_workers(server_name = nil) ⇒ Object
Returns [Hash<String:>] All servers actively working on this job.
-
#rocket_job_work(_worker, re_raise_exceptions = false) ⇒ Object
Works on this job.
Instance Method Details
#fail_on_exception!(re_raise_exceptions = false, &block) ⇒ Object
Fail this job in the event of an exception.
The job is automatically saved only if an exception is raised in the supplied block.
re_raise_exceptions: [true|false]
Re-raise the exception after updating the job
Default: false
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 69 def fail_on_exception!(re_raise_exceptions = false, &block) SemanticLogger.named_tagged(job: id.to_s, &block) rescue Exception => e SemanticLogger.named_tagged(job: id.to_s) do if failed? || !may_fail? self.exception = JobException.from_exception(e) exception.worker_name = worker_name else fail(worker_name, e) end # Prevent validation failures from failing the job save(validate: false) unless new_record? || destroyed? raise e if re_raise_exceptions end end |
#perform ⇒ Object
58 59 60 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 58 def perform(*) raise NotImplementedError end |
#perform_now ⇒ Object
Runs the job now in the current thread.
Validations are called prior to running the job.
The job is not saved and therefore the following callbacks are not called:
-
before_save
-
after_save
-
before_create
-
after_create
Exceptions are not suppressed and should be handled by the caller.
48 49 50 51 52 53 54 55 56 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 48 def perform_now raise(::Mongoid::Errors::Validations, self) unless valid? worker = RocketJob::Worker.new start if may_start? # Re-Raise exceptions rocket_job_work(worker, true) if running? @rocket_job_output end |
#rocket_job_active_workers(server_name = nil) ⇒ Object
Returns [Hash<String:>] All servers actively working on this job
119 120 121 122 123 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 119 def rocket_job_active_workers(server_name = nil) return [] if !running? || (server_name && !worker_on_server?(server_name)) [ActiveWorker.new(worker_name, started_at, self)] end |
#rocket_job_work(_worker, re_raise_exceptions = false) ⇒ Object
Works on this job
Returns [true|false] whether any work was performed.
If an exception is thrown the job is marked as failed and the exception is set in the job itself.
Thread-safe, can be called by multiple threads at the same time
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/rocket_job/plugins/job/worker.rb', line 95 def rocket_job_work(_worker, re_raise_exceptions = false) raise(ArgumentError, "Job must be started before calling #rocket_job_work") unless running? fail_on_exception!(re_raise_exceptions) do if _perform_callbacks.empty? @rocket_job_output = perform else # Allows @rocket_job_output to be modified by after/around callbacks run_callbacks(:perform) do # Allow callbacks to fail, complete or abort the job @rocket_job_output = perform if running? end end if new_record? || destroyed? complete if may_complete? else may_complete? ? complete! : save! end end false end |