Class: Faktory::Processor
- Inherits:
-
Object
- Object
- Faktory::Processor
show all
- Includes:
- Util
- Defined in:
- lib/faktory/processor.rb
Overview
The Processor is a standalone thread which:
-
fetches a job
-
executes the job
a. instantiate the Worker
b. run the middleware chain
c. call
A Processor can exit due to shutdown (processor_stopped) or due to an error during job execution (processor_died)
If an error occurs in the job execution, the Processor calls the Manager to create a new one to replace itself and exits.
Constant Summary
collapse
- @@busy_lock =
Mutex.new
- @@busy_count =
0
Constants included
from Util
Util::EXPIRY
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Util
#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #server, #watchdog
#handle_exception
Constructor Details
#initialize(mgr) ⇒ Processor
Returns a new instance of Processor.
35
36
37
38
39
40
41
42
43
|
# File 'lib/faktory/processor.rb', line 35
def initialize(mgr)
@mgr = mgr
@down = false
@done = false
@thread = nil
@reloader = mgr.options[:reloader]
@logging = (mgr.options[:job_logger] || Faktory::JobLogger).new
@fetcher = Faktory::Fetcher.new(mgr.options)
end
|
Instance Attribute Details
#job ⇒ Object
Returns the value of attribute job.
27
28
29
|
# File 'lib/faktory/processor.rb', line 27
def job
@job
end
|
#thread ⇒ Object
Returns the value of attribute thread.
26
27
28
|
# File 'lib/faktory/processor.rb', line 26
def thread
@thread
end
|
Class Method Details
.busy_count ⇒ Object
31
32
33
|
# File 'lib/faktory/processor.rb', line 31
def self.busy_count
@@busy_count
end
|
Instance Method Details
#constantize(str) ⇒ Object
166
167
168
169
170
171
172
173
|
# File 'lib/faktory/processor.rb', line 166
def constantize(str)
names = str.split("::")
names.shift if names.empty? || names.first.empty?
names.inject(Object) do |constant, name|
constant.const_defined?(name) ? constant.const_get(name) : constant.const_missing(name)
end
end
|
#dispatch(payload) ⇒ Object
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/faktory/processor.rb', line 123
def dispatch(payload)
Faktory::Logging.with_job_hash_context(payload) do
@logging.call(payload) do
@reloader.call do
klass = constantize(payload["jobtype"])
jobinst = klass.new
jobinst.jid = payload["jid"]
jobinst._custom = payload["custom"]
yield jobinst
end
end
end
end
|
#fetch ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/faktory/processor.rb', line 99
def fetch
work = @fetcher.retrieve_work
if @down
(logger.info { "Faktory is online, #{Time.now - @down} sec downtime" }
@down = nil)
end
work
rescue Faktory::Shutdown
rescue => ex
handle_fetch_exception(ex)
end
|
#handle_fetch_exception(ex) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/faktory/processor.rb', line 111
def handle_fetch_exception(ex)
if !@down
@down = Time.now
logger.error("Error fetching job: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
end
sleep(1)
nil
end
|
#kill(wait = false) ⇒ Object
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/faktory/processor.rb', line 51
def kill(wait = false)
@done = true
return if !@thread
@thread.raise ::Faktory::Shutdown
@thread.value if wait
end
|
#process(work) ⇒ Object
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
|
# File 'lib/faktory/processor.rb', line 141
def process(work)
payload = work.job
begin
dispatch(payload) do |jobinst|
Faktory.worker_middleware.invoke(jobinst, payload) do
jobinst.perform(*payload["args"])
end
end
work.acknowledge
rescue Faktory::Shutdown => shut
work.fail(shut)
rescue Exception => ex
handle_exception(ex, {context: "Job raised exception", job: work.job})
work.fail(ex)
raise ex
end
end
|
#process_one ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/faktory/processor.rb', line 80
def process_one
work = fetch
if work
@@busy_lock.synchronize do
@@busy_count += 1
end
begin
@job = work.job
process(work)
ensure
@@busy_lock.synchronize do
@@busy_count -= 1
end
end
else
sleep 1
end
end
|
#run ⇒ Object
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/faktory/processor.rb', line 69
def run
until @done
process_one
end
@mgr.processor_stopped(self)
rescue Faktory::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
|
#start ⇒ Object
63
64
65
|
# File 'lib/faktory/processor.rb', line 63
def start
@thread ||= safe_thread("processor", &method(:run))
end
|
#terminate(wait = false) ⇒ Object
45
46
47
48
49
|
# File 'lib/faktory/processor.rb', line 45
def terminate(wait = false)
@done = true
return if !@thread
@thread.value if wait
end
|
#thread_identity ⇒ Object
162
163
164
|
# File 'lib/faktory/processor.rb', line 162
def thread_identity
@str ||= Thread.current.object_id.to_s(36)
end
|