Class: Workforce::Manager

Inherits:
Object
  • Object
show all
Includes:
Services
Defined in:
lib/workforce/manager.rb

Overview

A manager is responisble for managing instance of workers of a certain class. It allows to dinamically schedule a new worker or dispose an existing one by trapping signals.

A worker is any class that defines the method run.

For example:

class ContinuousRun
  def initialize
    Signal.trap(:QUIT) { @shutdown = true }
  end

  def run
    loop do
      puts "Running worker #{Process.pid}"
      sleep 1
      Process.exit if @shutdown
    end
  end
end

manager = Workforce::Manager.new(ContinuousRun)
manager.run

Now the current process will be managing instances of the ContinuousRun task:

- If you send a SIGUSR1 to it, it will schedule a new instance of ContinuousRun to run.
- If you send a SIGUSR2 to it, it will dispose one of the running instance of it.
- If you send a SIGQUIT or SIGINT to it, it will stop all the instances and then exit.

When a instance is disposed, a SIGQUIT is send to the worker process, so if you might want to trap the signal and perform a graceful shutdown on the worker.

Instance Method Summary collapse

Methods included from Services

#logger

Constructor Details

#initialize(worker) ⇒ Manager

Creates a new Manager instance to the given worker class.



37
38
39
40
41
42
# File 'lib/workforce/manager.rb', line 37

def initialize(worker)
  @worker             = worker
  @pids               = []
  @old_signals        = {}
  @pipe_in, @pipe_out = IO.pipe
end

Instance Method Details

#disposeObject

Sends a SIGQUIT to one of the workers.



121
122
123
124
125
126
127
128
# File 'lib/workforce/manager.rb', line 121

def dispose
  log_errors do
    ensure_running!
    logger.info "Disposing one of the workers"
  
    Process.kill(:QUIT, @pids.shift) unless @pids.empty?
  end
end

#launchObject

Launch a manager as a daemon. Creates a new fork running the manager and returns the manager pid to the parent process (the one who called this method).



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/workforce/manager.rb', line 75

def launch
  logger.info "Launching a new manager for #{@worker.name}"
  child_id = fork and return child_id
  Process.setsid
  Dir.chdir '/'
  File.umask 0000
  STDIN.reopen '/dev/null'
  STDOUT.reopen '/dev/null', 'a'
  STDERR.reopen STDOUT
  run
end

#runObject

Run the manager main loop, which controls the running instances.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/workforce/manager.rb', line 45

def run
  log_errors do
    logger.info "Starting manager for #{@worker}"

    $0 = "Workforce: #{@worker.name} master"

    trap_signal(:INT, :shutdown)
    trap_signal(:QUIT, :shutdown)
    trap_signal(:USR1, :schedule)
    trap_signal(:USR2, :dispose)

    trap_signal(:CLD) do
      pid = Process.wait
      logger.info "Removing process (#{pid}) from worker pool"
      @pids.delete(pid)
    end
    
    @running = true

    # Block process until something is written to @pipe_out
    IO.select([@pipe_in], nil, nil, nil)
    logger.info "Shutting down manager"
  end
ensure
  @pipe_in.close
  @pipe_out.close
end

#scheduleObject

schedule a new worker instance. This will fork the master process and start running the worker as a child process.



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/workforce/manager.rb', line 102

def schedule
  log_errors do
    ensure_running!
    logger.info "Scheduling new worker"
  
    if pid = fork
      logger.info "New worker scheduled: #{pid}"
      @pids << pid
    else
      $0 = "Workforce: #{@worker.name} worker"
      restore_signals
      @worker.new.run
      logger.info "Worker finished processing: #{Process.pid}"
      Process.exit
    end
  end
end

#shutdownObject

Sends a SIGQUIT to all worker processes and signals the main loop that it is safe to exit.



88
89
90
91
92
93
94
95
96
97
98
# File 'lib/workforce/manager.rb', line 88

def shutdown
  log_errors do
    ensure_running!
  
    logger.info "Shutting down child processes"
    @pids.each { |pid| Process.kill(:QUIT, pid) }
    Process.waitall
            
    @pipe_out.write(1)
  end
end