Class: Loops::ProcessManager
- Inherits:
-
Object
- Object
- Loops::ProcessManager
- Defined in:
- lib/loops/process_manager.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
Instance Method Summary collapse
-
#initialize(config, logger) ⇒ ProcessManager
constructor
A new instance of ProcessManager.
- #monitor_workers ⇒ Object
- #setup_signals ⇒ Object
- #shutdown? ⇒ Boolean
- #start_shutdown! ⇒ Object
- #start_workers(name, number, &blk) ⇒ Object
- #stop_workers(force = false) ⇒ Object
- #update_wait_period(period) ⇒ Object
- #wait_for_workers(seconds) ⇒ Object
Constructor Details
#initialize(config, logger) ⇒ ProcessManager
Returns a new instance of ProcessManager.
5 6 7 8 9 10 11 12 13 14 15 |
# File 'lib/loops/process_manager.rb', line 5 def initialize(config, logger) @config = { 'poll_period' => 1, 'wait_period' => 10, 'workers_engine' => 'fork', }.merge(config) @logger = logger @worker_pools = {} @shutdown = false end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
3 4 5 |
# File 'lib/loops/process_manager.rb', line 3 def logger @logger end |
Instance Method Details
#monitor_workers ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/loops/process_manager.rb', line 30 def monitor_workers setup_signals logger.info('Starting workers monitoring code...') loop do logger.debug("Checking workers' health...") @worker_pools.each do |name, pool| break if shutdown? pool.check_workers end break if shutdown? logger.debug("Sleeping for #{@config['poll_period']} seconds...") sleep(@config['poll_period']) end ensure logger.info("Workers monitoring loop is finished, starting shutdown...") # Send out stop signals stop_workers(false) # Wait for all the workers to die unless wait_for_workers(@config['wait_period']) logger.info("Some workers are still alive after 10 seconds of waiting. Killing them...") stop_workers(true) wait_for_workers(5) end end |
#setup_signals ⇒ Object
58 59 60 61 62 |
# File 'lib/loops/process_manager.rb', line 58 def setup_signals # Zombie reapers trap('CHLD') {} trap('EXIT') {} end |
#shutdown? ⇒ Boolean
95 96 97 |
# File 'lib/loops/process_manager.rb', line 95 def shutdown? @shutdown end |
#start_shutdown! ⇒ Object
99 100 101 102 |
# File 'lib/loops/process_manager.rb', line 99 def start_shutdown! logger.info("Starting shutdown (shutdown flag set)...") @shutdown = true end |
#start_workers(name, number, &blk) ⇒ Object
22 23 24 25 26 27 28 |
# File 'lib/loops/process_manager.rb', line 22 def start_workers(name, number, &blk) raise ArgumentError, "Need a worker block!" unless block_given? logger.debug("Creating a workers pool of #{number} workers for #{name} loop...") @worker_pools[name] = Loops::WorkerPool.new(name, self, @config['workers_engine'], &blk) @worker_pools[name].start_workers(number) end |
#stop_workers(force = false) ⇒ Object
85 86 87 88 89 90 91 92 93 |
# File 'lib/loops/process_manager.rb', line 85 def stop_workers(force = false) # Set shutdown flag logger.info("Stopping workers#{force ? ' (forced)' : ''}...") # Termination loop @worker_pools.each do |name, pool| pool.stop_workers(force) end end |
#update_wait_period(period) ⇒ Object
17 18 19 20 |
# File 'lib/loops/process_manager.rb', line 17 def update_wait_period(period) return unless period @config['wait_period'] = [@config['wait_period'], period].max end |
#wait_for_workers(seconds) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/loops/process_manager.rb', line 64 def wait_for_workers(seconds) seconds.times do logger.info("Shutting down... waiting for workers to die (we have #{seconds} seconds)...") running_total = 0 @worker_pools.each do |name, pool| running_total += pool.wait_workers end if running_total.zero? logger.info("All workers are dead. Exiting...") return true end logger.info("#{running_total} workers are still running! Sleeping for a second...") sleep(1) end return false end |