24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
# File 'lib/soter/job_worker.rb', line 24
def perform
process_id = Digest::MD5.
hexdigest("#{Socket.gethostname}-#{Process.pid}-#{Thread.current}")
log "#{process_id}: Spawning"
while(job = @queue.lock_next(process_id))
log "#{process_id}: Starting work on job #{job['_id']}"
log "#{process_id}: Job info =>"
job.each {
|key, value| log "#{process_id}: {#{key} : #{value}}" }
begin
handler_class = Soter.recursive_const_get(job['job']['class'])
job_handler = handler_class.new(job['job']['params'])
job_handler.perform
log "#{process_id}: " + job_handler.message
if job_handler.success?
@queue.complete(job, process_id)
log "#{process_id}: Completed job #{job['_id']}"
else
offset = Soter.retry_offset(job['attempts']+1)
job['active_at'] = Time.now.utc + offset
@queue.error(job, job_handler.message)
log "#{process_id}: Failed job #{job['_id']}"
end
sleep(1) if fork?
rescue Exception => e
@queue.complete(job, e.message)
log "#{process_id}: Failed job #{job['_id']}" +
" with error #{e.message}"
log "#{process_id}: Backtrace =>"
e.backtrace.each { |line| log "#{process_id}: #{line}"}
end
end log "#{process_id}: Harakiri"
@log.close
end
|