Class: Sidekiq::Processor
- Inherits:
-
Object
- Object
- Sidekiq::Processor
show all
- Includes:
- Component
- Defined in:
- lib/sidekiq/processor.rb
Overview
The Processor is a standalone thread which:
-
fetches a job from Redis
-
executes the job
a. instantiate the job class
b. run the middleware chain
c. call #perform
A Processor can exit due to shutdown or due to an error during job execution.
If an error occurs in the job execution, the Processor calls the Manager to create a new one to replace itself and exits.
Defined Under Namespace
Classes: Counter, SharedWorkState
Constant Summary
collapse
- PROCESSED =
Counter.new
- FAILURE =
Counter.new
- WORK_STATE =
SharedWorkState.new
Instance Attribute Summary collapse
Attributes included from Component
#config
Instance Method Summary
collapse
Methods included from Component
#fire_event, #handle_exception, #hostname, #identity, #inspect, #logger, #process_nonce, #redis, #safe_thread, #tid, #watchdog
Constructor Details
#initialize(capsule, &block) ⇒ Processor
Returns a new instance of Processor.
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/sidekiq/processor.rb', line 31
def initialize(capsule, &block)
@config = @capsule = capsule
@callback = block
@down = false
@done = false
@job = nil
@thread = nil
@reloader = Sidekiq.default_configuration[:reloader]
@job_logger = (capsule.config[:job_logger] || Sidekiq::JobLogger).new(capsule.config)
@retrier = Sidekiq::JobRetry.new(capsule)
end
|
Instance Attribute Details
#capsule ⇒ Object
Returns the value of attribute capsule.
29
30
31
|
# File 'lib/sidekiq/processor.rb', line 29
def capsule
@capsule
end
|
#job ⇒ Object
Returns the value of attribute job.
28
29
30
|
# File 'lib/sidekiq/processor.rb', line 28
def job
@job
end
|
#thread ⇒ Object
Returns the value of attribute thread.
27
28
29
|
# File 'lib/sidekiq/processor.rb', line 27
def thread
@thread
end
|
Instance Method Details
#dispatch(job_hash, queue, jobstr) ⇒ Object
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
# File 'lib/sidekiq/processor.rb', line 122
def dispatch(job_hash, queue, jobstr)
@job_logger.prepare(job_hash) do
@retrier.global(jobstr, queue) do
@job_logger.call(job_hash, queue) do
stats(jobstr, queue) do
@reloader.call do
klass = Object.const_get(job_hash["class"])
instance = klass.new
instance.jid = job_hash["jid"]
instance._context = self
@retrier.local(instance, jobstr, queue) do
yield instance
end
end
end
end
end
end
end
|
#execute_job(instance, cloned_args) ⇒ Object
219
220
221
|
# File 'lib/sidekiq/processor.rb', line 219
def execute_job(instance, cloned_args)
instance.perform(*cloned_args)
end
|
#fetch ⇒ Object
102
103
104
105
106
107
108
109
110
|
# File 'lib/sidekiq/processor.rb', line 102
def fetch
j = get_one
if j && @done
j.requeue
nil
else
j
end
end
|
#get_one ⇒ Object
90
91
92
93
94
95
96
97
98
99
100
|
# File 'lib/sidekiq/processor.rb', line 90
def get_one
uow = capsule.fetcher.retrieve_work
if @down
logger.info { "Redis is online, #{::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - @down} sec downtime" }
@down = nil
end
uow
rescue Sidekiq::Shutdown
rescue => ex
handle_fetch_exception(ex)
end
|
#handle_fetch_exception(ex) ⇒ Object
112
113
114
115
116
117
118
119
120
|
# File 'lib/sidekiq/processor.rb', line 112
def handle_fetch_exception(ex)
unless @down
@down = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
logger.error("Error fetching job: #{ex}")
handle_exception(ex)
end
sleep(1)
nil
end
|
#kill(wait = false) ⇒ Object
49
50
51
52
53
54
55
56
57
58
59
|
# File 'lib/sidekiq/processor.rb', line 49
def kill(wait = false)
@done = true
return unless @thread
@thread.raise ::Sidekiq::Shutdown
@thread.value if wait
end
|
#process(uow) ⇒ Object
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
# File 'lib/sidekiq/processor.rb', line 159
def process(uow)
jobstr = uow.job
queue = uow.queue_name
job_hash = nil
begin
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
handle_exception(ex, {context: "Invalid JSON for job", jobstr: jobstr})
now = Time.now.to_f
redis do |conn|
conn.multi do |xa|
xa.zadd("dead", now.to_s, jobstr)
xa.zremrangebyscore("dead", "-inf", now - @capsule.config[:dead_timeout_in_seconds])
xa.zremrangebyrank("dead", 0, - @capsule.config[:dead_max_jobs])
end
end
return uow.acknowledge
end
ack = false
Thread.handle_interrupt(IGNORE_SHUTDOWN_INTERRUPTS) do
Thread.handle_interrupt(ALLOW_SHUTDOWN_INTERRUPTS) do
dispatch(job_hash, queue, jobstr) do |instance|
config.server_middleware.invoke(instance, job_hash, queue) do
execute_job(instance, job_hash["args"])
end
end
ack = true
rescue Sidekiq::Shutdown
rescue Sidekiq::JobRetry::Skip => s
ack = true
raise s
rescue Sidekiq::JobRetry::Handled => h
ack = true
e = h.cause || h
handle_exception(e, {context: "Job raised exception", job: job_hash})
raise e
rescue Exception => ex
handle_exception(ex, {context: "Internal exception!", job: job_hash, jobstr: jobstr})
raise ex
end
ensure
if ack
uow.acknowledge
end
end
end
|
#process_one(&block) ⇒ Object
84
85
86
87
88
|
# File 'lib/sidekiq/processor.rb', line 84
def process_one(&block)
@job = fetch
process(@job) if @job
@job = nil
end
|
#run ⇒ Object
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/sidekiq/processor.rb', line 71
def run
Thread.current[:sidekiq_capsule] = @capsule
process_one until @done
@callback.call(self)
rescue Sidekiq::Shutdown
@callback.call(self)
rescue Exception => ex
@callback.call(self, ex)
end
|
#start ⇒ Object
65
66
67
|
# File 'lib/sidekiq/processor.rb', line 65
def start
@thread ||= safe_thread("#{config.name}/processor", &method(:run))
end
|
#stats(jobstr, queue) ⇒ Object
277
278
279
280
281
282
283
284
285
286
287
288
289
|
# File 'lib/sidekiq/processor.rb', line 277
def stats(jobstr, queue)
WORK_STATE.set(tid, {queue: queue, payload: jobstr, run_at: Time.now.to_i})
begin
yield
rescue Exception
FAILURE.incr
raise
ensure
WORK_STATE.delete(tid)
PROCESSED.incr
end
end
|
#stopping? ⇒ Boolean
61
62
63
|
# File 'lib/sidekiq/processor.rb', line 61
def stopping?
@done
end
|
#terminate(wait = false) ⇒ Object
43
44
45
46
47
|
# File 'lib/sidekiq/processor.rb', line 43
def terminate(wait = false)
@done = true
return unless @thread
@thread.value if wait
end
|