Class: HotPotato::SupervisorServer
- Inherits:
-
Object
- Object
- HotPotato::SupervisorServer
- Includes:
- Core
- Defined in:
- lib/hot_potato/supervisor_server.rb
Overview
The supervisor is a process that runs on each machine that participates in the cluster. When it starts it does the following:
-
Read the routes file
-
Connect to the Redis server and get the appTask process ID table
-
Acquire the global lock
-
If a process is needed, fork a new process for AppTask
-
Release the global lock
-
Rinse and Repeat
The supervisor also starts the Heartbeat service and logging service as background threads.
The supervisor can be managed from the command line:
$ bin/supervisor [run|start|stop|restart]
If started without any settings, it will default to run.
Constant Summary collapse
- MAX_APP_TASKS =
32
- HEARTBEAT_INTERVAL =
20
- PID_FILE =
"#{APP_PATH}/tmp/supervisor.pid"
- LOG_FILE =
"#{APP_PATH}/logs/supervisor.log"
Constants included from Core
Instance Method Summary collapse
-
#has_capacity ⇒ Object
Determines if this host reached the limit of the number of AppTasks it can run.
-
#initialize ⇒ SupervisorServer
constructor
A new instance of SupervisorServer.
-
#load_options ⇒ Object
Loads the options, mainly from the config.yml file, into an OpenStruct object.
- #parse_options ⇒ Object
-
#restart ⇒ Object
Restarts the Supervisor.
- #run ⇒ Object
-
#shutdown ⇒ Object
Kills any running AppTasks on this machine and removes entries from the process table cache.
- #start ⇒ Object
-
#start_heartbeat_service ⇒ Object
Starts a background thread to update the process list in redis.
-
#start_log_service ⇒ Object
OK, this is not really a log service, but it is responsible for subscribing to the log messages from the AppTasks on this server.
-
#stop ⇒ Object
Stops the Supervisor.
Methods included from Core
#acquire_lock, #config, #log, #queue_inject, #queue_subscribe, #release_lock, #set_logger, #stat
Constructor Details
#initialize ⇒ SupervisorServer
Returns a new instance of SupervisorServer.
33 34 35 36 37 38 |
# File 'lib/hot_potato/supervisor_server.rb', line 33 def initialize @options = @options.mode = trap("INT") { shutdown } self.send(@options.mode) end |
Instance Method Details
#has_capacity ⇒ Object
Determines if this host reached the limit of the number of AppTasks it can run
169 170 171 |
# File 'lib/hot_potato/supervisor_server.rb', line 169 def has_capacity return stat.keys("hotpotato.apptask.#{@options.hostname}.*").count < @options.max_app_tasks end |
#load_options ⇒ Object
Loads the options, mainly from the config.yml file, into an OpenStruct object.
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/hot_potato/supervisor_server.rb', line 204 def = OpenStruct.new .max_app_tasks = MAX_APP_TASKS .group = "" .mode = :run .hostname = Socket.gethostname .running = true if config["servers"] config["servers"].each do |server| if server["hostname"] == .hostname .max_app_tasks = server["max_app_tasks"] || MAX_APP_TASKS .group = server["group"] || "" break end end end return end |
#parse_options ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/hot_potato/supervisor_server.rb', line 127 def mode = :run op = OptionParser.new do |opts| opts. = "Usage: #{$0} [run|start|stop|restart]" opts.on_tail("-h", "--help", "Show this message") do puts op exit end end begin op.parse! mode = (ARGV.shift || "run").to_sym if ![:start, :stop, :restart, :run].include?(mode) puts op exit 1 end rescue puts op exit 1 end return mode end |
#restart ⇒ Object
Restarts the Supervisor
121 122 123 124 125 |
# File 'lib/hot_potato/supervisor_server.rb', line 121 def restart stop sleep 2 start end |
#run ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/hot_potato/supervisor_server.rb', line 40 def run $0 = "Hot Potato Supervisor" log.info "Starting Hot Potato Supervisor #{HotPotato::VERSION}" begin start_heartbeat_service start_log_service routes = HotPotato::Route.routes while @options.running do if acquire_lock :supervisor log.debug "Lock acquired" routes.app_tasks.each do |app_task| if app_task.running_instances < app_task.instances && app_task.allow_group(@options.group) if has_capacity log.info "Starting AppTask [#{classify(app_task.classname)}]" pid = fork do Process.setsid exec "#{APP_PATH}/bin/app_task #{app_task.classname.to_s}" end Process.detach pid sleep 2 else log.warn "Cannot start AppTask [#{app_task.classname}] - Server at Capacity (Increase max_app_tasks)" end end end release_lock :supervisor end sleep (5 + rand(5)) end rescue Exception log.error $! log.error $@ exit 1 end end |
#shutdown ⇒ Object
Kills any running AppTasks on this machine and removes entries from the process table cache. Removes entry for the supervisor in the process table cache.
153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/hot_potato/supervisor_server.rb', line 153 def shutdown @options.running = false stat.keys("hotpotato.apptask.#{@options.hostname}.*").each do |app_task_key| app_task = JSON.parse(stat.get(app_task_key)) log.info "Killing PID #{app_task.pid} [#{app_task.classname}]" Process.kill("INT", app_task.pid) if Process.alive?(app_task.pid) stat.del app_task_key end stat.keys("hotpotato.supervisor.#{@options.hostname}.*").each do |supervisor_key| stat.del supervisor_key end log.info "Stopping Supervisor..." end |
#start ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/hot_potato/supervisor_server.rb', line 76 def start # Check if we are running if File.exists?(PID_FILE) pid = 0; File.open(PID_FILE, 'r') do |f| pid = f.read.to_i end # Check if we are REALLY running if Process.alive?(pid) log.fatal "Supervisor is already running on this machine. Only one instance can run per machine." exit 1 else log.info "Supervisor is not running despite the presence of the pid file. I will overwrite the pid file and start the supervisor." end end Process.daemon File.open(PID_FILE, 'w') do |f| f.write "#{Process.pid}\n" end STDIN.reopen '/dev/null' STDOUT.reopen LOG_FILE, 'a' STDERR.reopen STDOUT STDOUT.sync = true STDERR.sync = true run end |
#start_heartbeat_service ⇒ Object
Starts a background thread to update the process list in redis.
188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/hot_potato/supervisor_server.rb', line 188 def start_heartbeat_service si = SupervisorInfo.new stat.set si.key, si.to_json stat.expire si.key, 120 Thread.new do log.info "Thread created for Supervisor [Heartbeat]" loop do si.touch stat.set si.key, si.to_json, 120 sleep HEARTBEAT_INTERVAL end end end |
#start_log_service ⇒ Object
OK, this is not really a log service, but it is responsible for subscribing to the log messages from the AppTasks on this server.
175 176 177 178 179 180 181 182 183 184 185 |
# File 'lib/hot_potato/supervisor_server.rb', line 175 def start_log_service Thread.new do log.info "Thread created for Supervisor [Log]" queue_subscribe("hotpotato.log.#{@options.hostname}") do |m| log_entry = JSON.parse(m) if log.respond_to?(log_entry["severity"].to_sym) log.send(log_entry["severity"].to_sym, "#{log_entry['classname']} [#{log_entry['pid']}] - #{log_entry['message']}") end end end end |
#stop ⇒ Object
Stops the Supervisor. Requires the existance of a PID file. Calls the shutdown hook to stop AppTasks.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/hot_potato/supervisor_server.rb', line 105 def stop pid = 0 shutdown if File.exists?(PID_FILE) File.open(PID_FILE, 'r') do |f| pid = f.read.to_i end Process.kill("INT", pid) if Process.alive?(pid) File.delete(PID_FILE) else log.fatal "Supervisor PID file does not exist." exit 1 end end |