Class: OpenC3::Operator
Direct Known Subclasses
Constant Summary collapse
- CYCLE_TIME =
cycle time to check for new microservices
5.0
- PROCESS_SHUTDOWN_SECONDS =
5.0
- @@instance =
nil
Instance Attribute Summary collapse
-
#cycle_time ⇒ Object
readonly
Returns the value of attribute cycle_time.
-
#processes ⇒ Object
readonly
Returns the value of attribute processes.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize ⇒ Operator
constructor
A new instance of Operator.
- #remove_old ⇒ Object
- #respawn_changed ⇒ Object
- #respawn_dead ⇒ Object
- #run ⇒ Object
- #shutdown ⇒ Object
- #shutdown_processes(processes) ⇒ Object
- #start_new ⇒ Object
- #stop ⇒ Object
- #update ⇒ Object
Constructor Details
#initialize ⇒ Operator
Returns a new instance of Operator.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/openc3/operators/operator.rb', line 238 def initialize Logger.level = Logger::INFO Logger.microservice_name = 'MicroserviceOperator' OperatorProcess.setup() @cycle_time = (ENV['OPERATOR_CYCLE_TIME'] and ENV['OPERATOR_CYCLE_TIME'].to_f) || CYCLE_TIME # time in seconds @ruby_process_name = ENV['OPENC3_RUBY'] if RUBY_ENGINE != 'ruby' @ruby_process_name ||= 'jruby' else @ruby_process_name ||= 'ruby' end @processes = {} @new_processes = {} @changed_processes = {} @removed_processes = {} @mutex = Mutex.new @shutdown = false @shutdown_complete = false end |
Instance Attribute Details
#cycle_time ⇒ Object (readonly)
Returns the value of attribute cycle_time.
231 232 233 |
# File 'lib/openc3/operators/operator.rb', line 231 def cycle_time @cycle_time end |
#processes ⇒ Object (readonly)
Returns the value of attribute processes.
231 232 233 |
# File 'lib/openc3/operators/operator.rb', line 231 def processes @processes end |
Class Method Details
.instance ⇒ Object
394 395 396 |
# File 'lib/openc3/operators/operator.rb', line 394 def self.instance @@instance end |
.processes ⇒ Object
390 391 392 |
# File 'lib/openc3/operators/operator.rb', line 390 def self.processes @@instance.processes end |
.run ⇒ Object
385 386 387 388 |
# File 'lib/openc3/operators/operator.rb', line 385 def self.run @@instance = self.new @@instance.run end |
Instance Method Details
#remove_old ⇒ Object
289 290 291 292 293 294 295 296 297 |
# File 'lib/openc3/operators/operator.rb', line 289 def remove_old @mutex.synchronize do if @removed_processes.length > 0 Logger.info("Shutting down #{@removed_processes.length} removed microservices...") shutdown_processes(@removed_processes) @removed_processes = {} end end end |
#respawn_changed ⇒ Object
276 277 278 279 280 281 282 283 284 285 286 287 |
# File 'lib/openc3/operators/operator.rb', line 276 def respawn_changed @mutex.synchronize do if @changed_processes.length > 0 Logger.info("Cycling #{@changed_processes.length} changed microservices...") shutdown_processes(@changed_processes) break if @shutdown @changed_processes.each { |_name, p| p.start } @changed_processes = {} end end end |
#respawn_dead ⇒ Object
299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/openc3/operators/operator.rb', line 299 def respawn_dead @mutex.synchronize do @processes.each do |_name, p| break if @shutdown p.output_increment unless p.alive? # Respawn process output = p.extract_output Logger.error("Unexpected process died... respawning! #{p.cmd_line}\n#{output}\n", scope: p.scope) p.start end end end end |
#run ⇒ Object
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/openc3/operators/operator.rb', line 355 def run # Use at_exit to shutdown cleanly at_exit { shutdown() } # Monitor processes and respawn if died Logger.info("#{self.class} Monitoring processes every #{@cycle_time} sec...") loop do update() remove_old() respawn_changed() start_new() respawn_dead() break if @shutdown sleep(@cycle_time) break if @shutdown end loop do break if @shutdown_complete sleep(0.1) end ensure Logger.info("#{self.class} shutdown complete") end |
#shutdown ⇒ Object
345 346 347 348 349 350 351 352 353 |
# File 'lib/openc3/operators/operator.rb', line 345 def shutdown @shutdown = true @mutex.synchronize do Logger.info("Shutting down processes...") shutdown_processes(@processes) Logger.info("Shutting down processes complete") @shutdown_complete = true end end |
#shutdown_processes(processes) ⇒ Object
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/openc3/operators/operator.rb', line 314 def shutdown_processes(processes) # Make a copy so we don't mutate original processes = processes.dup Logger.info("Commanding soft stops...") processes.each { |_name, p| p.soft_stop } start_time = Time.now # Allow sufficient time for processes to shutdown cleanly while (Time.now - start_time) < PROCESS_SHUTDOWN_SECONDS processes_to_remove = [] processes.each do |name, p| unless p.alive? processes_to_remove << name Logger.debug("Soft stop process successful: #{p.cmd_line}", scope: p.scope) end end processes_to_remove.each do |name| processes.delete(name) end if processes.length <= 0 Logger.debug("Soft stop all successful") break end sleep(0.1) end if processes.length > 0 Logger.debug("Commanding hard stops...") processes.each { |_name, p| p.hard_stop } end end |
#start_new ⇒ Object
265 266 267 268 269 270 271 272 273 274 |
# File 'lib/openc3/operators/operator.rb', line 265 def start_new @mutex.synchronize do if @new_processes.length > 0 # Start all the processes Logger.info("#{self.class} starting each new process...") @new_processes.each { |_name, p| p.start } @new_processes = {} end end end |
#stop ⇒ Object
381 382 383 |
# File 'lib/openc3/operators/operator.rb', line 381 def stop @shutdown = true end |
#update ⇒ Object
261 262 263 |
# File 'lib/openc3/operators/operator.rb', line 261 def update raise "Implement in subclass" end |