Class: Creeper::Processor
- Inherits:
-
Object
- Object
- Creeper::Processor
show all
- Includes:
- Celluloid, Util
- Defined in:
- lib/creeper/processor.rb
Overview
The Processor receives a message from the Manager and actually processes it. It instantiates the worker, runs the middleware chain and then calls Creeper::Worker#perform.
Constant Summary
Constants included
from Util
Util::EXPIRY
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Util
#beanstalk, #constantize, #logger, #process_id, #redis, #watchdog
#handle_exception
Constructor Details
#initialize(boss) ⇒ Processor
Returns a new instance of Processor.
29
30
31
|
# File 'lib/creeper/processor.rb', line 29
def initialize(boss)
@boss = boss
end
|
Class Method Details
.default_middleware ⇒ Object
Instance Method Details
#inspect ⇒ Object
57
58
59
|
# File 'lib/creeper/processor.rb', line 57
def inspect
"#<Processor #{to_s}>"
end
|
#process(msgstr, queue, job, conn) ⇒ Object
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
# File 'lib/creeper/processor.rb', line 33
def process(msgstr, queue, job, conn)
msg = Creeper.load_json(msgstr) rescue msgstr
klass = Creeper.job_descriptions[queue]
klass ||= constantize(msg['class'])
worker = klass.new
stats(worker, msg, queue) do
Creeper.server_middleware.invoke(worker, msg, queue, job, conn) do
args = msg['args']
args ||= [msg]
worker.perform(*cloned(args))
end
end
job.delete rescue nil
@boss.processor_done!(current_actor)
rescue => ex
job.bury rescue nil
handle_exception(ex, msg || { :message => msgstr })
raise
ensure
conn.close rescue nil
end
|
#to_s ⇒ Object
61
62
63
|
# File 'lib/creeper/processor.rb', line 61
def to_s
@str ||= "#{hostname}:#{process_id}-#{Thread.current.object_id}:default"
end
|