Class: Workforce::Manager
- Inherits:
-
Object
- Object
- Workforce::Manager
- Includes:
- Services
- Defined in:
- lib/workforce/manager.rb
Overview
A manager is responisble for managing instance of workers of a certain class. It allows to dinamically schedule a new worker or dispose an existing one by trapping signals.
A worker is any class that defines the method run.
For example:
class ContinuousRun
def initialize
Signal.trap(:QUIT) { @shutdown = true }
end
def run
loop do
puts "Running worker #{Process.pid}"
sleep 1
Process.exit if @shutdown
end
end
end
manager = Workforce::Manager.new(ContinuousRun)
manager.run
Now the current process will be managing instances of the ContinuousRun task:
- If you send a SIGUSR1 to it, it will schedule a new instance of ContinuousRun to run.
- If you send a SIGUSR2 to it, it will dispose one of the running instance of it.
- If you send a SIGQUIT or SIGINT to it, it will stop all the instances and then exit.
When a instance is disposed, a SIGQUIT is send to the worker process, so if you might want to trap the signal and perform a graceful shutdown on the worker.
Instance Method Summary collapse
-
#dispose ⇒ Object
Sends a SIGQUIT to one of the workers.
-
#initialize(worker) ⇒ Manager
constructor
Creates a new Manager instance to the given worker class.
-
#launch ⇒ Object
Launch a manager as a daemon.
-
#run ⇒ Object
Run the manager main loop, which controls the running instances.
-
#schedule ⇒ Object
schedule a new worker instance.
-
#shutdown ⇒ Object
Sends a SIGQUIT to all worker processes and signals the main loop that it is safe to exit.
Methods included from Services
Constructor Details
#initialize(worker) ⇒ Manager
Creates a new Manager instance to the given worker class.
37 38 39 40 41 42 |
# File 'lib/workforce/manager.rb', line 37 def initialize(worker) @worker = worker @pids = [] @old_signals = {} @pipe_in, @pipe_out = IO.pipe end |
Instance Method Details
#dispose ⇒ Object
Sends a SIGQUIT to one of the workers.
121 122 123 124 125 126 127 128 |
# File 'lib/workforce/manager.rb', line 121 def dispose log_errors do ensure_running! logger.info "Disposing one of the workers" Process.kill(:QUIT, @pids.shift) unless @pids.empty? end end |
#launch ⇒ Object
Launch a manager as a daemon. Creates a new fork running the manager and returns the manager pid to the parent process (the one who called this method).
75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/workforce/manager.rb', line 75 def launch logger.info "Launching a new manager for #{@worker.name}" child_id = fork and return child_id Process.setsid Dir.chdir '/' File.umask 0000 STDIN.reopen '/dev/null' STDOUT.reopen '/dev/null', 'a' STDERR.reopen STDOUT run end |
#run ⇒ Object
Run the manager main loop, which controls the running instances.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/workforce/manager.rb', line 45 def run log_errors do logger.info "Starting manager for #{@worker}" $0 = "Workforce: #{@worker.name} master" trap_signal(:INT, :shutdown) trap_signal(:QUIT, :shutdown) trap_signal(:USR1, :schedule) trap_signal(:USR2, :dispose) trap_signal(:CLD) do pid = Process.wait logger.info "Removing process (#{pid}) from worker pool" @pids.delete(pid) end @running = true # Block process until something is written to @pipe_out IO.select([@pipe_in], nil, nil, nil) logger.info "Shutting down manager" end ensure @pipe_in.close @pipe_out.close end |
#schedule ⇒ Object
schedule a new worker instance. This will fork the master process and start running the worker as a child process.
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/workforce/manager.rb', line 102 def schedule log_errors do ensure_running! logger.info "Scheduling new worker" if pid = fork logger.info "New worker scheduled: #{pid}" @pids << pid else $0 = "Workforce: #{@worker.name} worker" restore_signals @worker.new.run logger.info "Worker finished processing: #{Process.pid}" Process.exit end end end |
#shutdown ⇒ Object
Sends a SIGQUIT to all worker processes and signals the main loop that it is safe to exit.
88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/workforce/manager.rb', line 88 def shutdown log_errors do ensure_running! logger.info "Shutting down child processes" @pids.each { |pid| Process.kill(:QUIT, pid) } Process.waitall @pipe_out.write(1) end end |