Class: Concurrent::Supervisor
- Inherits:
-
Object
- Object
- Concurrent::Supervisor
show all
- Defined in:
- lib/concurrent/supervisor.rb
Defined Under Namespace
Classes: WorkerContext, WorkerCounts
Constant Summary
collapse
- DEFAULT_MONITOR_INTERVAL =
1
- RESTART_STRATEGIES =
[:one_for_one, :one_for_all, :rest_for_one]
- DEFAULT_MAX_RESTART =
5
- DEFAULT_MAX_TIME =
60
- WORKER_API =
{run: 0, stop: 0, running?: 0}
- CHILD_TYPES =
[:worker, :supervisor]
- CHILD_RESTART_OPTIONS =
[:permanent, :transient, :temporary]
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(opts = {}) ⇒ Supervisor
Returns a new instance of Supervisor.
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/concurrent/supervisor.rb', line 63
def initialize(opts = {})
@restart_strategy = opts[:restart_strategy] || opts[:strategy] || :one_for_one
@monitor_interval = (opts[:monitor_interval] || DEFAULT_MONITOR_INTERVAL).to_f
@max_restart = (opts[:max_restart] || opts[:max_r] || DEFAULT_MAX_RESTART).to_i
@max_time = (opts[:max_time] || opts[:max_t] || DEFAULT_MAX_TIME).to_i
raise ArgumentError.new(":#{@restart_strategy} is not a valid restart strategy") unless RESTART_STRATEGIES.include?(@restart_strategy)
raise ArgumentError.new(':monitor_interval must be greater than zero') unless @monitor_interval > 0.0
raise ArgumentError.new(':max_restart must be greater than zero') unless @max_restart > 0
raise ArgumentError.new(':max_time must be greater than zero') unless @max_time > 0
@running = false
@mutex = Mutex.new
@workers = []
@monitor = nil
@count = WorkerCounts.new(0, 0, 0)
@restart_times = []
add_worker(opts[:worker]) unless opts[:worker].nil?
add_workers(opts[:workers]) unless opts[:workers].nil?
end
|
Instance Attribute Details
#max_restart ⇒ Object
Also known as:
max_r
Returns the value of attribute max_restart.
56
57
58
|
# File 'lib/concurrent/supervisor.rb', line 56
def max_restart
@max_restart
end
|
#max_time ⇒ Object
Also known as:
max_t
Returns the value of attribute max_time.
57
58
59
|
# File 'lib/concurrent/supervisor.rb', line 57
def max_time
@max_time
end
|
#monitor_interval ⇒ Object
Returns the value of attribute monitor_interval.
54
55
56
|
# File 'lib/concurrent/supervisor.rb', line 54
def monitor_interval
@monitor_interval
end
|
#restart_strategy ⇒ Object
Also known as:
strategy
Returns the value of attribute restart_strategy.
55
56
57
|
# File 'lib/concurrent/supervisor.rb', line 55
def restart_strategy
@restart_strategy
end
|
Instance Method Details
#add_worker(worker, opts = {}) ⇒ Object
Also known as:
add_child
151
152
153
154
155
156
157
158
159
160
161
162
163
164
|
# File 'lib/concurrent/supervisor.rb', line 151
def add_worker(worker, opts = {})
return nil if worker.nil? || ! behaves_as_worker?(worker)
@mutex.synchronize {
restart = opts[:restart] || :permanent
type = opts[:type] || (worker.is_a?(Supervisor) ? :supervisor : nil) || :worker
raise ArgumentError.new(":#{restart} is not a valid restart option") unless CHILD_RESTART_OPTIONS.include?(restart)
raise ArgumentError.new(":#{type} is not a valid child type") unless CHILD_TYPES.include?(type)
context = WorkerContext.new(worker, type, restart)
@workers << context
@count.add(context)
worker.run if @running
context.object_id
}
end
|
#add_workers(workers, opts = {}) ⇒ Object
Also known as:
add_children
167
168
169
170
171
|
# File 'lib/concurrent/supervisor.rb', line 167
def add_workers(workers, opts = {})
workers.collect do |worker|
add_worker(worker, opts)
end
end
|
#count ⇒ Object
144
145
146
147
148
149
|
# File 'lib/concurrent/supervisor.rb', line 144
def count
@mutex.synchronize do
@count.status = @workers.collect{|w| w.thread ? w.thread.status : false }
@count.dup.freeze
end
end
|
#current_restart_count ⇒ Object
140
141
142
|
# File 'lib/concurrent/supervisor.rb', line 140
def current_restart_count
@restart_times.length
end
|
#length ⇒ Object
Also known as:
size
135
136
137
|
# File 'lib/concurrent/supervisor.rb', line 135
def length
@mutex.synchronize { @workers.length }
end
|
#remove_worker(worker_id) ⇒ Object
Also known as:
remove_child
174
175
176
177
178
179
180
181
182
|
# File 'lib/concurrent/supervisor.rb', line 174
def remove_worker(worker_id)
@mutex.synchronize do
index, context = find_worker(worker_id)
break(nil) if context.nil?
break(false) if context.alive?
@workers.delete_at(index)
context.worker
end
end
|
#restart_worker(worker_id) ⇒ Object
Also known as:
restart_child
212
213
214
215
216
217
218
219
220
221
222
223
224
|
# File 'lib/concurrent/supervisor.rb', line 212
def restart_worker(worker_id)
@mutex.synchronize do
return false unless @running
index, context = find_worker(worker_id)
break(nil) if context.nil?
break(false) if context.restart == :temporary
context.terminated = false
terminate_worker(context)
run_worker(context)
true
end
end
|
#run ⇒ Object
98
99
100
101
102
103
104
105
|
# File 'lib/concurrent/supervisor.rb', line 98
def run
@mutex.synchronize do
raise StandardError.new('already running') if @running
@running = true
end
monitor
true
end
|
#run! ⇒ Object
86
87
88
89
90
91
92
93
94
95
96
|
# File 'lib/concurrent/supervisor.rb', line 86
def run!
@mutex.synchronize do
raise StandardError.new('already running') if @running
@running = true
@monitor = Thread.new do
Thread.current.abort_on_exception = false
monitor
end
end
Thread.pass
end
|
#running? ⇒ Boolean
131
132
133
|
# File 'lib/concurrent/supervisor.rb', line 131
def running?
@mutex.synchronize { @running }
end
|
#start_worker(worker_id) ⇒ Object
Also known as:
start_child
199
200
201
202
203
204
205
206
207
208
209
|
# File 'lib/concurrent/supervisor.rb', line 199
def start_worker(worker_id)
@mutex.synchronize do
return false unless @running
index, context = find_worker(worker_id)
break(nil) if context.nil?
context.terminated = false
run_worker(context) unless context.alive?
true
end
end
|
#stop ⇒ Object
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# File 'lib/concurrent/supervisor.rb', line 107
def stop
@mutex.synchronize do
return true unless @running
@running = false
unless @monitor.nil?
@monitor.run if @monitor.status == 'sleep'
if @monitor.join(0.1).nil?
@monitor.kill
end
@monitor = nil
end
@restart_times.clear
@workers.length.times do |i|
context = @workers[-1-i]
terminate_worker(context)
end
prune_workers
end
true
end
|
#stop_worker(worker_id) ⇒ Object
Also known as:
stop_child
185
186
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/concurrent/supervisor.rb', line 185
def stop_worker(worker_id)
@mutex.synchronize do
return true unless @running
index, context = find_worker(worker_id)
break(nil) if index.nil?
context.terminated = true
terminate_worker(context)
@workers.delete_at(index) if @workers[index].restart == :temporary
true
end
end
|