Class: Sidekiq::Manager
Overview
The main router in the system. This manages the processor state and accepts messages from Redis to be dispatched to an idle processor.
Constant Summary
collapse
- SPIN_TIME_FOR_GRACEFUL_SHUTDOWN =
1
- JVM_RESERVED_SIGNALS =
Don’t Process#kill if we get these signals via the API
['USR1', 'USR2']
- PROCTITLES =
[
proc { 'sidekiq'.freeze },
proc { Sidekiq::VERSION },
proc { |mgr, data| data['tag'] },
proc { |mgr, data| "[#{mgr.busy.size} of #{data['concurrency']} busy]" },
proc { |mgr, data| "stopping" if mgr.stopped? },
]
Constants included
from Util
Util::EXPIRY
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Actor
included
Methods included from Util
#fire_event, #hostname, #identity, #logger, #process_nonce, #redis, #want_a_hertz_donut?, #watchdog
#handle_exception
Constructor Details
#initialize(condvar, options = {}) ⇒ Manager
Returns a new instance of Manager.
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
# File 'lib/sidekiq/manager.rb', line 26
def initialize(condvar, options={})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
@done_callback = nil
@finished = condvar
@in_progress = {}
@threads = {}
@done = false
@busy = []
@ready = @count.times.map do
p = Processor.new_link(current_actor)
p.proxy_id = p.object_id
p
end
end
|
Instance Attribute Details
#busy ⇒ Object
Returns the value of attribute busy.
20
21
22
|
# File 'lib/sidekiq/manager.rb', line 20
def busy
@busy
end
|
#fetcher ⇒ Object
Returns the value of attribute fetcher.
21
22
23
|
# File 'lib/sidekiq/manager.rb', line 21
def fetcher
@fetcher
end
|
#ready ⇒ Object
Returns the value of attribute ready.
19
20
21
|
# File 'lib/sidekiq/manager.rb', line 19
def ready
@ready
end
|
Instance Method Details
#assign(work) ⇒ Object
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
# File 'lib/sidekiq/manager.rb', line 113
def assign(work)
watchdog("Manager#assign died") do
if stopped?
work.requeue
else
processor = @ready.pop
@in_progress[processor.object_id] = work
@busy << processor
processor.async.process(work)
end
end
end
|
#clean_up_for_graceful_shutdown ⇒ Object
62
63
64
65
66
67
68
69
70
|
# File 'lib/sidekiq/manager.rb', line 62
def clean_up_for_graceful_shutdown
if @busy.empty?
shutdown
return true
end
after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown }
false
end
|
#heartbeat(key, data, json) ⇒ Object
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/sidekiq/manager.rb', line 146
def heartbeat(key, data, json)
results = PROCTITLES.map {|x| x.(self, data) }
results.compact!
$0 = results.join(' ')
❤(key, json)
after(5) do
heartbeat(key, data, json)
end
end
|
#processor_died(processor, reason) ⇒ Object
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# File 'lib/sidekiq/manager.rb', line 96
def processor_died(processor, reason)
watchdog("Manager#processor_died died") do
@in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor)
unless stopped?
p = Processor.new_link(current_actor)
p.proxy_id = p.object_id
@ready << p
dispatch
else
shutdown if @busy.empty?
end
end
end
|
#processor_done(processor) ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
# File 'lib/sidekiq/manager.rb', line 80
def processor_done(processor)
watchdog('Manager#processor_done died') do
@done_callback.call(processor) if @done_callback
@in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor)
if stopped?
processor.terminate if processor.alive?
shutdown if @busy.empty?
else
@ready << processor if processor.alive?
end
dispatch
end
end
|
#real_thread(proxy_id, thr) ⇒ Object
A hack worthy of Rube Goldberg. We need to be able to hard stop a working thread. But there’s no way for us to get handle to the underlying thread performing work for a processor so we have it call us and tell us.
134
135
136
|
# File 'lib/sidekiq/manager.rb', line 134
def real_thread(proxy_id, thr)
@threads[proxy_id] = thr
end
|
#start ⇒ Object
72
73
74
|
# File 'lib/sidekiq/manager.rb', line 72
def start
@ready.each { dispatch }
end
|
#stop(options = {}) ⇒ Object
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/sidekiq/manager.rb', line 45
def stop(options={})
watchdog('Manager#stop died') do
should_shutdown = options[:shutdown]
timeout = options[:timeout]
@done = true
logger.info { "Terminating #{@ready.size} quiet workers" }
@ready.each { |x| x.terminate if x.alive? }
@ready.clear
return if clean_up_for_graceful_shutdown
hard_shutdown_in timeout if should_shutdown
end
end
|
#stopped? ⇒ Boolean
157
158
159
|
# File 'lib/sidekiq/manager.rb', line 157
def stopped?
@done
end
|
#when_done(&blk) ⇒ Object
76
77
78
|
# File 'lib/sidekiq/manager.rb', line 76
def when_done(&blk)
@done_callback = blk
end
|