Class: Litejob::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/litejob/processor.rb

Overview

Litejob::Processor is responsible for processing job payloads

Instance Method Summary collapse

Constructor Details

#initialize(queue, id, serialized_job) ⇒ Processor

Returns a new instance of Processor.



9
10
11
12
13
14
15
16
17
# File 'lib/litejob/processor.rb', line 9

def initialize(queue, id, serialized_job)
  @queue = queue
  @id = id
  @serialized_job = serialized_job
  @job_hash = JSON.parse(@serialized_job)
  @litequeue = Litequeue.instance

  set_log_context!(queue: @queue, class: @job_hash["class"], job: @id)
end

Instance Method Details

#process!Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/litejob/processor.rb', line 23

def process!
  log(:deq)
  klass = Object.const_get(@job_hash["class"])
  instance = klass.new

  begin
    instance.perform(*@job_hash["params"])
    log(:end)
  rescue => e
    if @job_hash["retries_left"] == 0
      err(e, "retries exhausted, moving to _dead queue")
      repush(@id, @job_hash, 0, "_dead")
    else
      @job_hash["retries_left"] ||= @job_hash["attempts"]
      @job_hash["retries_left"] -= 1
      retry_delay = (@job_hash["attempts"] - @job_hash["retries_left"]) * 0.1
      err(e, "retrying in #{retry_delay} seconds")
      repush(@id, @job_hash, retry_delay, @job_hash["queue"])
    end
  end
rescue => e
  # this is an error in the extraction of job info, retrying here will not be useful
  err(e, "while processing job=#{@serialized_job}")
  raise e
end

#repush(id, job, delay = 0, queue = nil) ⇒ Object



19
20
21
# File 'lib/litejob/processor.rb', line 19

def repush(id, job, delay = 0, queue = nil)
  @litequeue.repush(id, JSON.dump(job), queue: queue, delay: delay)
end