Class: Concurrent::Supervisor

Inherits:
Object
  • Object
show all
Defined in:
lib/concurrent/supervisor.rb

Defined Under Namespace

Classes: WorkerContext, WorkerCounts

Constant Summary collapse

DEFAULT_MONITOR_INTERVAL =
1
RESTART_STRATEGIES =
[:one_for_one, :one_for_all, :rest_for_one]
DEFAULT_MAX_RESTART =
5
DEFAULT_MAX_TIME =
60
WORKER_API =
{run: 0, stop: 0, running?: 0}
CHILD_TYPES =
[:worker, :supervisor]
CHILD_RESTART_OPTIONS =
[:permanent, :transient, :temporary]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Supervisor

Returns a new instance of Supervisor.

Raises:

  • (ArgumentError)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/concurrent/supervisor.rb', line 63

def initialize(opts = {})
  @restart_strategy = opts[:restart_strategy] || opts[:strategy] || :one_for_one
  @monitor_interval = (opts[:monitor_interval] || DEFAULT_MONITOR_INTERVAL).to_f
  @max_restart = (opts[:max_restart] || opts[:max_r] || DEFAULT_MAX_RESTART).to_i
  @max_time = (opts[:max_time] || opts[:max_t] || DEFAULT_MAX_TIME).to_i

  raise ArgumentError.new(":#{@restart_strategy} is not a valid restart strategy") unless RESTART_STRATEGIES.include?(@restart_strategy)
  raise ArgumentError.new(':monitor_interval must be greater than zero') unless @monitor_interval > 0.0
  raise ArgumentError.new(':max_restart must be greater than zero') unless @max_restart > 0
  raise ArgumentError.new(':max_time must be greater than zero') unless @max_time > 0

  @running = false
  @mutex = Mutex.new
  @workers = []
  @monitor = nil

  @count = WorkerCounts.new(0, 0, 0)
  @restart_times = []

  add_worker(opts[:worker]) unless opts[:worker].nil?
  add_workers(opts[:workers]) unless opts[:workers].nil?
end

Instance Attribute Details

#max_restartObject (readonly) Also known as: max_r

Returns the value of attribute max_restart.



56
57
58
# File 'lib/concurrent/supervisor.rb', line 56

def max_restart
  @max_restart
end

#max_timeObject (readonly) Also known as: max_t

Returns the value of attribute max_time.



57
58
59
# File 'lib/concurrent/supervisor.rb', line 57

def max_time
  @max_time
end

#monitor_intervalObject (readonly)

Returns the value of attribute monitor_interval.



54
55
56
# File 'lib/concurrent/supervisor.rb', line 54

def monitor_interval
  @monitor_interval
end

#restart_strategyObject (readonly) Also known as: strategy

Returns the value of attribute restart_strategy.



55
56
57
# File 'lib/concurrent/supervisor.rb', line 55

def restart_strategy
  @restart_strategy
end

Instance Method Details

#add_worker(worker, opts = {}) ⇒ Object Also known as: add_child



151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/concurrent/supervisor.rb', line 151

def add_worker(worker, opts = {})
  return nil if worker.nil? || ! behaves_as_worker?(worker)
  @mutex.synchronize {
    restart = opts[:restart] || :permanent
    type = opts[:type] || (worker.is_a?(Supervisor) ? :supervisor : nil) || :worker
    raise ArgumentError.new(":#{restart} is not a valid restart option") unless CHILD_RESTART_OPTIONS.include?(restart)
    raise ArgumentError.new(":#{type} is not a valid child type") unless CHILD_TYPES.include?(type)
    context = WorkerContext.new(worker, type, restart)
    @workers << context
    @count.add(context)
    worker.run if @running
    context.object_id
  }
end

#add_workers(workers, opts = {}) ⇒ Object Also known as: add_children



167
168
169
170
171
# File 'lib/concurrent/supervisor.rb', line 167

def add_workers(workers, opts = {})
  workers.collect do |worker|
    add_worker(worker, opts)
  end
end

#countObject



144
145
146
147
148
149
# File 'lib/concurrent/supervisor.rb', line 144

def count
  @mutex.synchronize do
    @count.status = @workers.collect{|w| w.thread ? w.thread.status : false }
    @count.dup.freeze
  end
end

#current_restart_countObject



140
141
142
# File 'lib/concurrent/supervisor.rb', line 140

def current_restart_count
  @restart_times.length
end

#lengthObject Also known as: size



135
136
137
# File 'lib/concurrent/supervisor.rb', line 135

def length
  @mutex.synchronize { @workers.length }
end

#remove_worker(worker_id) ⇒ Object Also known as: remove_child



174
175
176
177
178
179
180
181
182
# File 'lib/concurrent/supervisor.rb', line 174

def remove_worker(worker_id)
  @mutex.synchronize do
    index, context = find_worker(worker_id)
    break(nil) if context.nil?
    break(false) if context.alive?
    @workers.delete_at(index)
    context.worker
  end
end

#restart_worker(worker_id) ⇒ Object Also known as: restart_child



212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/concurrent/supervisor.rb', line 212

def restart_worker(worker_id)
  @mutex.synchronize do
    return false unless @running

    index, context = find_worker(worker_id)
    break(nil) if context.nil?
    break(false) if context.restart == :temporary
    context.terminated = false
    terminate_worker(context)
    run_worker(context)
    true
  end
end

#runObject



98
99
100
101
102
103
104
105
# File 'lib/concurrent/supervisor.rb', line 98

def run
  @mutex.synchronize do
    raise StandardError.new('already running') if @running
    @running = true
  end
  monitor
  true
end

#run!Object



86
87
88
89
90
91
92
93
94
95
96
# File 'lib/concurrent/supervisor.rb', line 86

def run!
  @mutex.synchronize do
    raise StandardError.new('already running') if @running
    @running = true
    @monitor = Thread.new do
      Thread.current.abort_on_exception = false
      monitor
    end
  end
  Thread.pass
end

#running?Boolean

Returns:

  • (Boolean)


131
132
133
# File 'lib/concurrent/supervisor.rb', line 131

def running?
  @mutex.synchronize { @running }
end

#start_worker(worker_id) ⇒ Object Also known as: start_child



199
200
201
202
203
204
205
206
207
208
209
# File 'lib/concurrent/supervisor.rb', line 199

def start_worker(worker_id)
  @mutex.synchronize do
    return false unless @running

    index, context = find_worker(worker_id)
    break(nil) if context.nil?
    context.terminated = false
    run_worker(context) unless context.alive?
    true
  end
end

#stopObject



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/concurrent/supervisor.rb', line 107

def stop
  @mutex.synchronize do
    return true unless @running

    @running = false
    unless @monitor.nil?
      @monitor.run if @monitor.status == 'sleep'
      if @monitor.join(0.1).nil?
        @monitor.kill
      end
      @monitor = nil
    end
    @restart_times.clear

    @workers.length.times do |i|
      context = @workers[-1-i]
      terminate_worker(context)
    end
    prune_workers
  end

  true
end

#stop_worker(worker_id) ⇒ Object Also known as: stop_child



185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/concurrent/supervisor.rb', line 185

def stop_worker(worker_id)
  @mutex.synchronize do
    return true unless @running

    index, context = find_worker(worker_id)
    break(nil) if index.nil?
    context.terminated = true
    terminate_worker(context)
    @workers.delete_at(index) if @workers[index].restart == :temporary
    true
  end
end