Class: AWS::Flow::ForkingExecutor Private
- Inherits:
-
Object
- Object
- AWS::Flow::ForkingExecutor
- Defined in:
- lib/aws/decider/executor.rb
This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.
Class Attribute Summary collapse
- .executors ⇒ Object private
Instance Attribute Summary collapse
- #is_shutdown ⇒ Object private
- #max_workers ⇒ Object private
- #pids ⇒ Object private
Instance Method Summary collapse
- #block_on_max_workers ⇒ Object private
- #execute(&block) ⇒ Object private
-
#initialize(options = {}) ⇒ ForkingExecutor
constructor
private
A new instance of ForkingExecutor.
- #shutdown(timeout_seconds) ⇒ Object private
Constructor Details
#initialize(options = {}) ⇒ ForkingExecutor
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of ForkingExecutor.
51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/aws/decider/executor.rb', line 51 def initialize( = {}) unless @log = [:logger] @log = Logger.new("#{Dir.tmpdir}/forking_log") @log.level = [:log_level] || Logger::ERROR @log.info("LOG INITIALIZED") end @semaphore = Mutex.new @max_workers = [:max_workers] || 1 @pids = [] @is_shutdown = false ForkingExecutor.executors ||= [] ForkingExecutor.executors << self end |
Class Attribute Details
.executors ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
47 48 49 |
# File 'lib/aws/decider/executor.rb', line 47 def executors @executors end |
Instance Attribute Details
#is_shutdown ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
49 50 51 |
# File 'lib/aws/decider/executor.rb', line 49 def is_shutdown @is_shutdown end |
#max_workers ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
49 50 51 |
# File 'lib/aws/decider/executor.rb', line 49 def max_workers @max_workers end |
#pids ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
49 50 51 |
# File 'lib/aws/decider/executor.rb', line 49 def pids @pids end |
Instance Method Details
#block_on_max_workers ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
120 121 122 123 124 125 126 127 128 129 |
# File 'lib/aws/decider/executor.rb', line 120 def block_on_max_workers @log.debug "block_on_max_workers workers=#{@pids.size}, max_workers=#{@max_workers}" if @pids.size >= @max_workers @log.info "Reached maximum number of workers (#{@max_workers}), \ waiting for some to finish" begin remove_completed_pids(true) end while @pids.size >= @max_workers end end |
#execute(&block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/aws/decider/executor.rb', line 65 def execute(&block) @log.info "Here are the pids that are currently running #{@pids}" raise RejectedExecutionException if @is_shutdown block_on_max_workers @log.debug "PARENT BEFORE FORK #{Process.pid}" child_pid = fork do begin @log.debug "CHILD #{Process.pid}" # TODO: which signals to ignore? # ignore signals in the child %w{ TERM INT HUP SIGUSR2 }.each { |signal| Signal.trap(signal, 'SIG_IGN') } block.call @log.debug "CHILD #{Process.pid} AFTER block.call" Process.exit!(0) rescue => e @log.error e @log.error "Definitely dying off right here" Process.exit!(1) end end @log.debug "PARENT AFTER FORK #{Process.pid}, child_pid=#{child_pid}" @pids << child_pid end |
#shutdown(timeout_seconds) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/aws/decider/executor.rb', line 89 def shutdown(timeout_seconds) @is_shutdown = true remove_completed_pids unless @pids.empty? @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish" # If the timeout_seconds value is set to Float::INFINITY, it will wait indefinitely till all workers finish # their work. This allows us to handle graceful shutdown of workers. if timeout_seconds == Float::INFINITY @log.info "Exit requested, waiting indefinitely till all child processes finish" remove_completed_pids true while !@pids.empty? else @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish" # check every second for child processes to finish timeout_seconds.times do sleep 1 remove_completed_pids break if @pids.empty? end end # forcibly kill all remaining children unless @pids.empty? @log.warn "Child processes still running, sending KILL signal: #{@pids.join(',')}" @pids.each { |pid| Process.kill('KILL', pid) } end end end |