Class: Sidekiq::Manager
Overview
The main router in the system. This manages the processor state and accepts messages from Redis to be dispatched to an idle processor.
Constant Summary
collapse
- SPIN_TIME_FOR_GRACEFUL_SHUTDOWN =
1
Constants included
from Util
Util::EXPIRY
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Actor
included
Methods included from Util
#fire_event, #hostname, #identity, #logger, #redis, #watchdog
#handle_exception
Constructor Details
#initialize(condvar, options = {}) ⇒ Manager
Returns a new instance of Manager.
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# File 'lib/sidekiq/manager.rb', line 25
def initialize(condvar, options={})
logger.debug { options.inspect }
@options = options
@count = options[:concurrency] || 25
@done_callback = nil
@finished = condvar
@in_progress = {}
@threads = {}
@done = false
@busy = []
@ready = @count.times.map do
p = Processor.new_link(current_actor)
p.proxy_id = p.object_id
p
end
end
|
Instance Attribute Details
#busy ⇒ Object
Returns the value of attribute busy.
20
21
22
|
# File 'lib/sidekiq/manager.rb', line 20
def busy
@busy
end
|
#fetcher ⇒ Object
Returns the value of attribute fetcher.
21
22
23
|
# File 'lib/sidekiq/manager.rb', line 21
def fetcher
@fetcher
end
|
#ready ⇒ Object
Returns the value of attribute ready.
19
20
21
|
# File 'lib/sidekiq/manager.rb', line 19
def ready
@ready
end
|
Instance Method Details
#assign(work) ⇒ Object
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
# File 'lib/sidekiq/manager.rb', line 111
def assign(work)
watchdog("Manager#assign died") do
if stopped?
work.requeue
else
processor = @ready.pop
@in_progress[processor.object_id] = work
@busy << processor
processor.async.process(work)
end
end
end
|
#clean_up_for_graceful_shutdown ⇒ Object
60
61
62
63
64
65
66
67
68
|
# File 'lib/sidekiq/manager.rb', line 60
def clean_up_for_graceful_shutdown
if @busy.empty?
shutdown
return true
end
after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown }
false
end
|
#heartbeat(key, data, json) ⇒ Object
136
137
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/sidekiq/manager.rb', line 136
def heartbeat(key, data, json)
proctitle = ['sidekiq', Sidekiq::VERSION]
proctitle << data['tag'] unless data['tag'].empty?
proctitle << "[#{@busy.size} of #{data['concurrency']} busy]"
proctitle << 'stopping' if stopped?
$0 = proctitle.join(' ')
❤(key, json)
after(5) do
heartbeat(key, data, json)
end
end
|
#processor_died(processor, reason) ⇒ Object
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'lib/sidekiq/manager.rb', line 94
def processor_died(processor, reason)
watchdog("Manager#processor_died died") do
@in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor)
unless stopped?
p = Processor.new_link(current_actor)
p.proxy_id = p.object_id
@ready << p
dispatch
else
shutdown if @busy.empty?
end
end
end
|
#processor_done(processor) ⇒ Object
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
# File 'lib/sidekiq/manager.rb', line 78
def processor_done(processor)
watchdog('Manager#processor_done died') do
@done_callback.call(processor) if @done_callback
@in_progress.delete(processor.object_id)
@threads.delete(processor.object_id)
@busy.delete(processor)
if stopped?
processor.terminate if processor.alive?
shutdown if @busy.empty?
else
@ready << processor if processor.alive?
end
dispatch
end
end
|
#real_thread(proxy_id, thr) ⇒ Object
A hack worthy of Rube Goldberg. We need to be able to hard stop a working thread. But there’s no way for us to get handle to the underlying thread performing work for a processor so we have it call us and tell us.
132
133
134
|
# File 'lib/sidekiq/manager.rb', line 132
def real_thread(proxy_id, thr)
@threads[proxy_id] = thr
end
|
#start ⇒ Object
70
71
72
|
# File 'lib/sidekiq/manager.rb', line 70
def start
@ready.each { dispatch }
end
|
#stop(options = {}) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
# File 'lib/sidekiq/manager.rb', line 43
def stop(options={})
watchdog('Manager#stop died') do
should_shutdown = options[:shutdown]
timeout = options[:timeout]
@done = true
logger.info { "Shutting down #{@ready.size} quiet workers" }
@ready.each { |x| x.terminate if x.alive? }
@ready.clear
return if clean_up_for_graceful_shutdown
hard_shutdown_in timeout if should_shutdown
end
end
|
#when_done(&blk) ⇒ Object
74
75
76
|
# File 'lib/sidekiq/manager.rb', line 74
def when_done(&blk)
@done_callback = blk
end
|