Class: AWS::Flow::ForkingExecutor Private

Inherits:
Object
  • Object
show all
Defined in:
lib/aws/decider/executor.rb

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ ForkingExecutor

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of ForkingExecutor.



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/aws/decider/executor.rb', line 51

def initialize(options = {})
  unless @log = options[:logger]
    @log = Logger.new("#{Dir.tmpdir}/forking_log")
    @log.level = options[:log_level] || Logger::ERROR
    @log.info("LOG INITIALIZED")
  end
  @semaphore = Mutex.new
  @max_workers = options[:max_workers] || 1
  @pids = []
  @is_shutdown = false
  ForkingExecutor.executors ||= []
  ForkingExecutor.executors << self
end

Class Attribute Details

.executorsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



47
48
49
# File 'lib/aws/decider/executor.rb', line 47

def executors
  @executors
end

Instance Attribute Details

#is_shutdownObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



49
50
51
# File 'lib/aws/decider/executor.rb', line 49

def is_shutdown
  @is_shutdown
end

#max_workersObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



49
50
51
# File 'lib/aws/decider/executor.rb', line 49

def max_workers
  @max_workers
end

#pidsObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



49
50
51
# File 'lib/aws/decider/executor.rb', line 49

def pids
  @pids
end

Instance Method Details

#block_on_max_workersObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



120
121
122
123
124
125
126
127
128
129
# File 'lib/aws/decider/executor.rb', line 120

def block_on_max_workers
  @log.debug "block_on_max_workers workers=#{@pids.size}, max_workers=#{@max_workers}"
  if @pids.size >= @max_workers
    @log.info "Reached maximum number of workers (#{@max_workers}), \
               waiting for some to finish"
    begin
      remove_completed_pids(true)
    end while @pids.size >= @max_workers
  end
end

#execute(&block) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/aws/decider/executor.rb', line 65

def execute(&block)
  @log.info "Here are the pids that are currently running #{@pids}"
  raise RejectedExecutionException if @is_shutdown
  block_on_max_workers
  @log.debug "PARENT BEFORE FORK #{Process.pid}"
  child_pid = fork do
    begin
      @log.debug "CHILD #{Process.pid}"
      # TODO: which signals to ignore?
      # ignore signals in the child
      %w{ TERM INT HUP SIGUSR2 }.each { |signal| Signal.trap(signal, 'SIG_IGN') }
      block.call
      @log.debug "CHILD #{Process.pid} AFTER block.call"
      Process.exit!(0)
    rescue => e
      @log.error e
      @log.error "Definitely dying off right here"
      Process.exit!(1)
    end
  end
  @log.debug "PARENT AFTER FORK #{Process.pid}, child_pid=#{child_pid}"
  @pids << child_pid
end

#shutdown(timeout_seconds) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/aws/decider/executor.rb', line 89

def shutdown(timeout_seconds)
  @is_shutdown = true
  remove_completed_pids

  unless @pids.empty?
    @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish"

    # If the timeout_seconds value is set to Float::INFINITY, it will wait indefinitely till all workers finish
    # their work. This allows us to handle graceful shutdown of workers.
    if timeout_seconds == Float::INFINITY
      @log.info "Exit requested, waiting indefinitely till all child processes finish"
      remove_completed_pids true while !@pids.empty?
    else
      @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish"
      # check every second for child processes to finish
      timeout_seconds.times do
        sleep 1
        remove_completed_pids
        break if @pids.empty?
      end
    end

    # forcibly kill all remaining children
    unless @pids.empty?
      @log.warn "Child processes still running, sending KILL signal: #{@pids.join(',')}"
      @pids.each { |pid| Process.kill('KILL', pid) }
    end
  end
end