Class: Sidekiq::Manager
- Inherits:
-
Object
show all
- Includes:
- Celluloid, Util
- Defined in:
- lib/sidekiq/manager.rb
Overview
The main router in the system. This manages the processor state and fetches messages from Redis to be dispatched to an idle processor.
Instance Method Summary
collapse
Methods included from Util
#constantize, #err, #log, #verbose, #watchdog
Constructor Details
#initialize(location, options = {}) ⇒ Manager
Returns a new instance of Manager.
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/sidekiq/manager.rb', line 21
def initialize(location, options={})
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{location}"
verbose options.inspect
@count = options[:processor_count] || 25
@queues = options[:queues]
@queue_idx = 0
@queues_size = @queues.size
@redis = Redis.connect(:url => location)
@done_callback = nil
@done = false
@busy = []
@ready = []
@count.times do
@ready << Processor.new_link(current_actor)
end
end
|
Instance Method Details
#processor_died(processor, reason) ⇒ Object
68
69
70
71
72
73
74
75
76
77
78
79
80
|
# File 'lib/sidekiq/manager.rb', line 68
def processor_died(processor, reason)
@busy.delete(processor)
if reason
log "Processor death: #{reason}"
log reason.backtrace.join("\n")
end
unless stopped?
@ready << Processor.new_link(current_actor)
dispatch
end
end
|
#processor_done(processor) ⇒ Object
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/sidekiq/manager.rb', line 57
def processor_done(processor)
@done_callback.call(processor) if @done_callback
@busy.delete(processor)
if stopped?
processor.terminate
else
@ready << processor
end
dispatch
end
|
#start ⇒ Object
49
50
51
|
# File 'lib/sidekiq/manager.rb', line 49
def start
dispatch(true)
end
|
#stop ⇒ Object
39
40
41
42
43
44
45
46
47
|
# File 'lib/sidekiq/manager.rb', line 39
def stop
@done = true
@ready.each(&:terminate)
@ready.clear
after(5) do
signal(:shutdown)
end
end
|
#when_done ⇒ Object
53
54
55
|
# File 'lib/sidekiq/manager.rb', line 53
def when_done
@done_callback = Proc.new
end
|