Class: Autorespawn::Manager

Inherits:
Object
  • Object
show all
Includes:
Hooks, Hooks::InstanceHooks
Defined in:
lib/autorespawn/manager.rb

Overview

Manager of a bunch of autorespawn slaves

Instance Attribute Summary collapse

Hooks collapse

Instance Method Summary collapse

Methods included from Hooks

included

Constructor Details

#initialize(name: nil, parallel_level: 1) ⇒ Manager

Returns a new instance of Manager.



75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/autorespawn/manager.rb', line 75

def initialize(name: nil, parallel_level: 1)
    @parallel_level = parallel_level
    @workers   = Array.new
    @name = name
    @seed = ProgramID.for_self
    @tracked_files = Hash.new

    @self_slave = Self.new(name: name)
    @workers << self_slave
    @queued_slaves = Array.new
    @active_slaves = Hash[self_slave.pid => self_slave]
end

Instance Attribute Details

#active_slavesHash<Slave> (readonly)

Returns list of active slaves.

Returns:

  • (Hash<Slave>)

    list of active slaves



25
26
27
# File 'lib/autorespawn/manager.rb', line 25

def active_slaves
  @active_slaves
end

#nameObject (readonly)

Returns an object that is used to identify the manager itself.

Returns:

  • (Object)

    an object that is used to identify the manager itself



14
15
16
# File 'lib/autorespawn/manager.rb', line 14

def name
  @name
end

#parallel_levelInteger

Returns the number of processes allowed to work in parallel.

Returns:

  • (Integer)

    the number of processes allowed to work in parallel



20
21
22
# File 'lib/autorespawn/manager.rb', line 20

def parallel_level
  @parallel_level
end

#queued_slavesArray<Slave> (readonly)

Returns list of slaves explicitely queued with #queue.

Returns:

  • (Array<Slave>)

    list of slaves explicitely queued with #queue



27
28
29
# File 'lib/autorespawn/manager.rb', line 27

def queued_slaves
  @queued_slaves
end

#seedProgramID (readonly)

Returns a seed object that is passed to new slaves to represent the currently known state of file, to avoid unnecessary respawning.

Returns:

  • (ProgramID)

    a seed object that is passed to new slaves to represent the currently known state of file, to avoid unnecessary respawning

See Also:

  • add_seed_file


12
13
14
# File 'lib/autorespawn/manager.rb', line 12

def seed
  @seed
end

#self_slaveSelf (readonly)

Returns an object that has the same API than [Slave] to represent the manager’s process itself. It is always included in #workers and #active_slaves.

Returns:

  • (Self)

    an object that has the same API than [Slave] to represent the manager’s process itself. It is always included in #workers and #active_slaves



18
19
20
# File 'lib/autorespawn/manager.rb', line 18

def self_slave
  @self_slave
end

#tracked_filesHash<Pathname,TrackedFile> (readonly)

Returns the whole set of files that are tracked by this manager’s slaves.

Returns:

  • (Hash<Pathname,TrackedFile>)

    the whole set of files that are tracked by this manager’s slaves



30
31
32
# File 'lib/autorespawn/manager.rb', line 30

def tracked_files
  @tracked_files
end

#workersArray<Slave> (readonly)

Returns declared worker processes, as a hash from the PID to a Slave object.

Returns:

  • (Array<Slave>)

    declared worker processes, as a hash from the PID to a Slave object



23
24
25
# File 'lib/autorespawn/manager.rb', line 23

def workers
  @workers
end

Instance Method Details

#active?(slave) ⇒ Boolean

Tests whether this slave is currently active on self

Returns:

  • (Boolean)


118
119
120
# File 'lib/autorespawn/manager.rb', line 118

def active?(slave)
    active_slaves[slave.pid] == slave
end

#add_slave(*cmdline, name: nil, **spawn_options) ⇒ Object

Spawns a worker, i.e. a program that will perform the intended work and report the program state

Parameters:

  • name (Object) (defaults to: nil)

    an arbitrary object that can be used for reporting / tracking



127
128
129
130
131
132
# File 'lib/autorespawn/manager.rb', line 127

def add_slave(*cmdline, name: nil, **spawn_options)
    slave = Slave.new(*cmdline, name: name, seed: seed, **spawn_options)
    slave.needed!
    register_slave(slave)
    slave
end

#clearObject

Kill and remove all workers from this manager

See Also:



210
211
212
213
214
215
216
217
# File 'lib/autorespawn/manager.rb', line 210

def clear
    kill
    workers.dup.each do |w|
        if w != self_slave
            remove_slave(w)
        end
    end
end

#collect_finished_slavesArray<Slave>

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Collect information about the finished slaves

Returns:

  • (Array<Slave>)

    the slaves that finished



164
165
166
167
168
169
170
171
172
# File 'lib/autorespawn/manager.rb', line 164

def collect_finished_slaves
    finished_slaves = Array.new
    while finished_child = Process.waitpid2(-1, Process::WNOHANG)
        finished_slaves << process_finished_slave(*finished_child)
    end
    finished_slaves
rescue Errno::ECHILD
    finished_slaves
end

#has_active_slaves?Boolean

Tests whether this manager has some slaves that are active

Returns:

  • (Boolean)


113
114
115
# File 'lib/autorespawn/manager.rb', line 113

def has_active_slaves?
    active_slaves.size != 1
end

#has_slaves?Boolean

Check whether this manager has slaves

Returns:

  • (Boolean)


97
98
99
100
# File 'lib/autorespawn/manager.rb', line 97

def has_slaves?
    # There's always a worker for self
    workers.size != 1
end

#include?(slave) ⇒ Boolean

Tests whether this slave is registered as a worker on self

Returns:

  • (Boolean)


108
109
110
# File 'lib/autorespawn/manager.rb', line 108

def include?(slave)
    workers.include?(slave)
end

#killObject

Kill all active slaves

See Also:



198
199
200
201
202
203
204
205
# File 'lib/autorespawn/manager.rb', line 198

def kill
    active_slaves.each_value { |s| s.kill(join: false) }
    while has_active_slaves?
        finished_child = Process.waitpid2(-1)
        process_finished_slave(*finished_child)
    end
rescue Errno::ECHILD
end

#on_slave_finished {|the| ... } ⇒ Object

Hook called when a slave finishes

Yield Parameters:



64
# File 'lib/autorespawn/manager.rb', line 64

define_hooks :on_slave_finished

#on_slave_new(&block) {|the| ... } ⇒ Object

Register a callback for when a new slave is added by #add_slave

Parameters:

  • block (#call)

    the callback

Yield Parameters:

  • the (Slave)

    new slave



38
39
40
41
42
43
# File 'lib/autorespawn/manager.rb', line 38

def on_slave_new(&block)
    __on_slave_new(&block)
    workers.each do |w|
        block.call(w)
    end
end

#on_slave_removed {|the| ... } ⇒ Object

Hook called when a slave has been removed from this manager

Yield Parameters:



71
# File 'lib/autorespawn/manager.rb', line 71

define_hooks :on_slave_removed

#on_slave_start(&block) {|the| ... } ⇒ Object

Register a callback that should be called when a new slave has been spawned by #poll

Parameters:

  • block (#call)

    the callback

Yield Parameters:

  • the (Slave)

    newly started slave



51
52
53
54
55
56
# File 'lib/autorespawn/manager.rb', line 51

def on_slave_start(&block)
    __on_slave_start(&block)
    active_slaves.each_value do |w|
        block.call(w)
    end
end

#poll(autospawn: true, update_files: true) ⇒ Object

Wait for children to terminate and spawns them when needed



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/autorespawn/manager.rb', line 245

def poll(autospawn: true, update_files: true)
    finished_slaves = collect_finished_slaves
    new_slaves = Array.new

    trigger_slaves_as_necessary
    active_slaves.each_value(&:poll)

    while active_slaves.size < parallel_level + 1
        if slave = queued_slaves.find { |s| !s.running? }
            queued_slaves.delete(slave)
        elsif autospawn
            needed_slaves, _remaining = workers.partition { |s| !s.running? && s.needed? }
            failed, normal = needed_slaves.partition { |s| s.finished? && !s.success? }
            slave = failed.first || normal.first
        end

        if slave
            slave.spawn(send_initial_dump: false)
            # We manually track the slave's needed flag, just forcefully
            # set it to false at that point
            slave.not_needed!
            run_hook :__on_slave_start, slave
            new_slaves << slave
            active_slaves[slave.pid] = slave
        else
            break
        end
    end
    return new_slaves, finished_slaves
end

#process_finished_slave(pid, status) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/autorespawn/manager.rb', line 174

def process_finished_slave(pid, status)
    return if !(slave = active_slaves.delete(pid))

    if slave.finished(status).empty?
        # Do not register the slave if it is already marked as needed?
        slave.each_tracked_file(with_status: true) do |path, mtime, size|
            tracker = (tracked_files[path] ||= TrackedFile.new(path, mtime: mtime, size: size))
            tracker.slaves << slave
        end
        slave.not_needed!
    end

    slave.subcommands.each do |name, cmdline, spawn_options|
        add_slave(*cmdline, name: name, **spawn_options)
    end
    seed.merge!(slave.program_id)

    run_hook :on_slave_finished, slave
    slave
end

#queue(slave) ⇒ Object

Queue a slave for execution



155
156
157
# File 'lib/autorespawn/manager.rb', line 155

def queue(slave)
    queued_slaves << slave
end

#register_seed_files(files, search_patch = seed.ruby_load_path, ignore_not_found: true) ⇒ Object

Add files to #seed

(see ProgramID#register_files)



91
92
93
94
# File 'lib/autorespawn/manager.rb', line 91

def register_seed_files(files, search_patch = seed.ruby_load_path, ignore_not_found: true)
    files = seed.resolve_file_list(files, search_path, ignore_not_found: ignore_not_found)
    seed.register_files(files)
end

#register_slave(slave) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Registers a slave



148
149
150
151
152
# File 'lib/autorespawn/manager.rb', line 148

def register_slave(slave)
    workers << slave
    run_hook :__on_slave_new, slave
    slave
end

#remove_slave(slave) ⇒ Object

Remove a worker from this manager

Raises:

  • (ArgumentError)

    if the slave is still running



137
138
139
140
141
142
143
# File 'lib/autorespawn/manager.rb', line 137

def remove_slave(slave)
    if active?(slave)
        raise ArgumentError, "#{slave} is still running"
    end
    workers.delete(slave)
    run_hook :on_slave_removed, slave
end

#runObject



219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/autorespawn/manager.rb', line 219

def run
    while true
        poll
        sleep 1
    end

rescue Interrupt
ensure
    active_slaves.values.each do |slave|
        slave.kill
    end
end

#slave_countObject

The number of slaves registered



103
104
105
# File 'lib/autorespawn/manager.rb', line 103

def slave_count
    workers.size - 1
end

#trigger_slaves_as_necessaryObject



232
233
234
235
236
237
238
239
240
241
242
# File 'lib/autorespawn/manager.rb', line 232

def trigger_slaves_as_necessary
    tracked_files.delete_if do |path, tracker|
        tracker.slaves.delete_if(&:needed?)
        if tracker.slaves.empty?
            true
        elsif tracker.update
            tracker.slaves.each(&:needed!)
            true
        end
    end
end