Class: Sidekiq::Manager
Overview
The main router in the system. This manages the processor state and accepts messages from Redis to be dispatched to an idle processor.
Constant Summary
collapse
- SPIN_TIME_FOR_GRACEFUL_SHUTDOWN =
1
Constants included
from Util
Util::EXPIRY
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Actor
included
Methods included from Util
#hostname, #logger, #process_id, #redis, #watchdog
#handle_exception
Constructor Details
#initialize(options = {}) ⇒ Manager
Returns a new instance of Manager.
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/sidekiq/manager.rb', line 24
def initialize(options={})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
@done_callback = nil
@in_progress = {}
@threads = {}
@done = false
@busy = []
@ready = @count.times.map do
p = Processor.new_link(current_actor)
p.proxy_id = p.object_id
p
end
end
|
Instance Attribute Details
#busy ⇒ Object
Returns the value of attribute busy.
19
20
21
|
# File 'lib/sidekiq/manager.rb', line 19
def busy
@busy
end
|
#fetcher ⇒ Object
Returns the value of attribute fetcher.
20
21
22
|
# File 'lib/sidekiq/manager.rb', line 20
def fetcher
@fetcher
end
|
#ready ⇒ Object
Returns the value of attribute ready.
18
19
20
|
# File 'lib/sidekiq/manager.rb', line 18
def ready
@ready
end
|
Instance Method Details
#assign(work) ⇒ Object
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
# File 'lib/sidekiq/manager.rb', line 110
def assign(work)
watchdog("Manager#assign died") do
if stopped?
work.requeue
else
processor = @ready.pop
@in_progress[processor.object_id] = work
@busy << processor
processor.async.process(work)
end
end
end
|
#clean_up_for_graceful_shutdown ⇒ Object
59
60
61
62
63
64
65
66
67
|
# File 'lib/sidekiq/manager.rb', line 59
def clean_up_for_graceful_shutdown
if @busy.empty?
shutdown
return true
end
after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown }
false
end
|
#processor_died(processor, reason) ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
# File 'lib/sidekiq/manager.rb', line 93
def processor_died(processor, reason)
watchdog("Manager#processor_died died") do
@in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor)
unless stopped?
p = Processor.new_link(current_actor)
p.proxy_id = p.object_id
@ready << p
dispatch
else
shutdown if @busy.empty?
end
end
end
|
#processor_done(processor) ⇒ Object
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
# File 'lib/sidekiq/manager.rb', line 77
def processor_done(processor)
watchdog('Manager#processor_done died') do
@done_callback.call(processor) if @done_callback
@in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor)
if stopped?
processor.terminate if processor.alive?
shutdown if @busy.empty?
else
@ready << processor if processor.alive?
end
dispatch
end
end
|
#procline(tag) ⇒ Object
135
136
137
|
# File 'lib/sidekiq/manager.rb', line 135
def procline(tag)
"sidekiq #{Sidekiq::VERSION} #{tag}[#{@busy.size} of #{@count} busy]#{stopped? ? ' stopping' : ''}"
end
|
#real_thread(proxy_id, thr) ⇒ Object
A hack worthy of Rube Goldberg. We need to be able to hard stop a working thread. But there’s no way for us to get handle to the underlying thread performing work for a processor so we have it call us and tell us.
131
132
133
|
# File 'lib/sidekiq/manager.rb', line 131
def real_thread(proxy_id, thr)
@threads[proxy_id] = thr
end
|
#start ⇒ Object
69
70
71
|
# File 'lib/sidekiq/manager.rb', line 69
def start
@ready.each { dispatch }
end
|
#stop(options = {}) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/sidekiq/manager.rb', line 41
def stop(options={})
watchdog('Manager#stop died') do
should_shutdown = options[:shutdown]
timeout = options[:timeout]
@done = true
logger.info { "Shutting down #{@ready.size} quiet workers" }
@ready.each { |x| x.terminate if x.alive? }
@ready.clear
clear_worker_set
return if clean_up_for_graceful_shutdown
hard_shutdown_in timeout if should_shutdown
end
end
|
#when_done(&blk) ⇒ Object
73
74
75
|
# File 'lib/sidekiq/manager.rb', line 73
def when_done(&blk)
@done_callback = blk
end
|