Class: Litejob::Processor
- Inherits:
-
Object
- Object
- Litejob::Processor
- Defined in:
- lib/litejob/processor.rb
Overview
Litejob::Processor is responsible for processing job payloads
Instance Method Summary collapse
-
#initialize(queue, id, serialized_job) ⇒ Processor
constructor
A new instance of Processor.
- #process! ⇒ Object
- #repush(id, job, delay = 0, queue = nil) ⇒ Object
Constructor Details
#initialize(queue, id, serialized_job) ⇒ Processor
Returns a new instance of Processor.
9 10 11 12 13 14 15 16 17 |
# File 'lib/litejob/processor.rb', line 9 def initialize(queue, id, serialized_job) @queue = queue @id = id @serialized_job = serialized_job @job_hash = JSON.parse(@serialized_job) @litequeue = Litequeue.instance set_log_context!(queue: @queue, class: @job_hash["class"], job: @id) end |
Instance Method Details
#process! ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/litejob/processor.rb', line 23 def process! log(:deq) klass = Object.const_get(@job_hash["class"]) instance = klass.new begin instance.perform(*@job_hash["params"]) log(:end) rescue => e if @job_hash["retries_left"] == 0 err(e, "retries exhausted, moving to _dead queue") repush(@id, @job_hash, 0, "_dead") else @job_hash["retries_left"] ||= @job_hash["attempts"] @job_hash["retries_left"] -= 1 retry_delay = (@job_hash["attempts"] - @job_hash["retries_left"]) * 0.1 err(e, "retrying in #{retry_delay} seconds") repush(@id, @job_hash, retry_delay, @job_hash["queue"]) end end rescue => e # this is an error in the extraction of job info, retrying here will not be useful err(e, "while processing job=#{@serialized_job}") raise e end |
#repush(id, job, delay = 0, queue = nil) ⇒ Object
19 20 21 |
# File 'lib/litejob/processor.rb', line 19 def repush(id, job, delay = 0, queue = nil) @litequeue.repush(id, JSON.dump(job), queue: queue, delay: delay) end |