Module: Fluent::PluginHelper::ChildProcess
Defined Under Namespace
Classes: ProcessInfo
Constant Summary collapse
- CHILD_PROCESS_LOOP_CHECK_INTERVAL =
sec
0.2
- CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT =
sec
10
- CHILD_PROCESS_DEFAULT_KILL_TIMEOUT =
sec
60
- MODE_PARAMS =
[:read, :write, :stderr, :read_with_stderr]
- STDERR_OPTIONS =
[:discard, :connect]
Constants included from EventLoop
EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT, EventLoop::EVENT_LOOP_SHUTDOWN_TIMEOUT
Constants included from Thread
Thread::THREAD_DEFAULT_WAIT_SECONDS, Thread::THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS
Instance Attribute Summary collapse
-
#_child_process_processes ⇒ Object
readonly
stop : mark callback thread as stopped shutdown : close write IO to child processes (STDIN of child processes), send TERM (KILL for Windows) to all child processes close : send KILL to all child processes terminate: [-].
Attributes included from Timer
Attributes included from EventLoop
Attributes included from Thread
Instance Method Summary collapse
-
#child_process_execute(title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, wait_timeout: nil, on_exit_callback: nil, &block) ⇒ Object
on_exit_callback = ->(status){ … } status is an instance of Process::Status On Windows, exitstatus=0 and termsig=nil even when child process was killed.
- #child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, wait_timeout, on_exit_callback, &block) ⇒ Object
- #child_process_exist?(pid) ⇒ Boolean
- #child_process_id ⇒ Object
- #child_process_kill(pinfo, force: false) ⇒ Object
- #child_process_running? ⇒ Boolean
- #close ⇒ Object
- #initialize ⇒ Object
- #shutdown ⇒ Object
- #stop ⇒ Object
- #terminate ⇒ Object
Methods included from Timer
#start, #timer_execute, #timer_running?
Methods included from EventLoop
#after_shutdown, #event_loop_attach, #event_loop_detach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop, #start
Methods included from Thread
#after_shutdown, #thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop
Instance Attribute Details
#_child_process_processes ⇒ Object (readonly)
stop : mark callback thread as stopped shutdown : close write IO to child processes (STDIN of child processes), send TERM (KILL for Windows) to all child processes close : send KILL to all child processes terminate: [-]
42 43 44 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 42 def _child_process_processes @_child_process_processes end |
Instance Method Details
#child_process_execute(title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, wait_timeout: nil, on_exit_callback: nil, &block) ⇒ Object
on_exit_callback = ->(status){ … } status is an instance of Process::Status On Windows, exitstatus=0 and termsig=nil even when child process was killed.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 65 def child_process_execute( title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, wait_timeout: nil, on_exit_callback: nil, &block ) raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments mode ||= [] mode = [] unless block raise ArgumentError, "BUG: invalid mode specification" unless mode.all?{|m| MODE_PARAMS.include?(m) } raise ArgumentError, "BUG: read_with_stderr is exclusive with :read and :stderr" if mode.include?(:read_with_stderr) && (mode.include?(:read) || mode.include?(:stderr)) raise ArgumentError, "BUG: invalid stderr handling specification" unless STDERR_OPTIONS.include?(stderr) raise ArgumentError, "BUG: number of block arguments are different from size of mode" if block && block.arity != mode.size running = false callback = ->(*args) { running = true begin block && block.call(*args) ensure running = false end } retval = nil execute_child_process = ->(){ child_process_execute_once( title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, wait_timeout, on_exit_callback, &callback ) } if immediate || !interval retval = execute_child_process.call end if interval timer_execute(:child_process_execute, interval, repeat: true) do if !parallel && running log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel else execute_child_process.call end end end retval # nil if interval end |
#child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, wait_timeout, on_exit_callback, &block) ⇒ Object
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 231 def child_process_execute_once( title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, wait_timeout, on_exit_callback, &block ) spawn_args = if arguments || subprocess_name [ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ] else [ env, command ] end spawn_opts = { unsetenv_others: unsetenv, } if chdir spawn_opts[:chdir] = chdir end = {} if scrub [:invalid] = [:undef] = :replace if replace_string [:replace] = replace_string end end log.debug "Executing command", title: title, spawn: spawn_args, mode: mode, stderr: stderr readio = writeio = stderrio = wait_thread = nil readio_in_use = writeio_in_use = stderrio_in_use = false if !mode.include?(:stderr) && !mode.include?(:read_with_stderr) spawn_opts[:err] = IO::NULL if stderr == :discard if !mode.include?(:read) && !mode.include?(:read_with_stderr) spawn_opts[:out] = IO::NULL end writeio, readio, wait_thread = *Open3.popen2(*spawn_args, spawn_opts) elsif mode.include?(:read_with_stderr) writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts) else writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts) end if mode.include?(:write) writeio.set_encoding(external_encoding, internal_encoding, **) writeio_in_use = true else writeio.reopen(IO::NULL) if writeio end if mode.include?(:read) || mode.include?(:read_with_stderr) readio.set_encoding(external_encoding, internal_encoding, **) readio_in_use = true end if mode.include?(:stderr) stderrio.set_encoding(external_encoding, internal_encoding, **) stderrio_in_use = true else stderrio.reopen(IO::NULL) if stderrio && stderr == :discard end pid = wait_thread.pid # wait_thread => Process::Waiter io_objects = [] mode.each do |m| io_obj = case m when :read then readio when :write then writeio when :read_with_stderr then readio when :stderr then stderrio else raise "BUG: invalid mode must be checked before here: '#{m}'" end io_objects << io_obj end m = Mutex.new m.lock thread = thread_create :child_process_callback do m.lock # run after plugin thread get pid, thread instance and i/o m.unlock begin @_child_process_processes[pid].alive = true block.call(*io_objects) if block_given? writeio.close if writeio rescue EOFError => e log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments rescue IOError => e if e. == 'stream closed' || e. == 'closed stream' # "closed stream" is of ruby 2.1 log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments else log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e end rescue Errno::EPIPE => e log.debug "Broken pipe, child process unexpectedly exits", title: title, pid: pid, command: command, arguments: arguments rescue => e log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e end if wait_timeout if wait_thread.join(wait_timeout) # Thread#join returns nil when limit expires # wait_thread successfully exits @_child_process_processes[pid].exit_status = wait_thread.value else log.warn "child process timed out", title: title, pid: pid, command: command, arguments: arguments child_process_kill(@_child_process_processes[pid], force: true) @_child_process_processes[pid].exit_status = wait_thread.value end else @_child_process_processes[pid].exit_status = wait_thread.value # with join end process_info = @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) } cb = process_info.on_exit_callback_mutex.synchronize do cback = process_info.on_exit_callback process_info.on_exit_callback = nil cback end if cb cb.call(process_info.exit_status) rescue nil end process_info.readio&.close rescue nil process_info.writeio&.close rescue nil process_info.stderrio&.close rescue nil end thread[:_fluentd_plugin_helper_child_process_running] = true thread[:_fluentd_plugin_helper_child_process_pid] = pid pinfo = ProcessInfo.new( title, thread, pid, readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use, wait_thread, false, nil, nil, on_exit_callback, Mutex.new ) @_child_process_mutex.synchronize do @_child_process_processes[pid] = pinfo end m.unlock pid end |
#child_process_exist?(pid) ⇒ Boolean
53 54 55 56 57 58 59 60 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 53 def child_process_exist?(pid) pinfo = @_child_process_processes[pid] return false unless pinfo return false if pinfo.exit_status true end |
#child_process_id ⇒ Object
49 50 51 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 49 def child_process_id ::Thread.current[:_fluentd_plugin_helper_child_process_pid] end |
#child_process_kill(pinfo, force: false) ⇒ Object
208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 208 def child_process_kill(pinfo, force: false) return if !pinfo pinfo.killed_at = Fluent::Clock.now unless force pid = pinfo.pid begin if !pinfo.exit_status && child_process_exist?(pid) signal = (Fluent.windows? || force) ? :KILL : :TERM Process.kill(signal, pinfo.pid) end rescue Errno::ECHILD, Errno::ESRCH # ignore end end |
#child_process_running? ⇒ Boolean
44 45 46 47 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 44 def child_process_running? # checker for code in callback of child_process_execute ::Thread.current[:_fluentd_plugin_helper_child_process_running] || false end |
#close ⇒ Object
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 174 def close while true pids = @_child_process_mutex.synchronize{ @_child_process_processes.keys } break if pids.size < 1 living_process_exist = false pids.each do |pid| process_info = @_child_process_processes[pid] next if !process_info || process_info.exit_status living_process_exist = true process_info.killed_at ||= Fluent::Clock.now # for illegular case (e.g., created after shutdown) timeout_at = process_info.killed_at + @_child_process_kill_timeout now = Fluent::Clock.now next if now < timeout_at child_process_kill(process_info, force: true) end break if living_process_exist sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL end super end |
#initialize ⇒ Object
122 123 124 125 126 127 128 129 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 122 def initialize super # plugins MAY configure this parameter @_child_process_exit_timeout = CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT @_child_process_kill_timeout = CHILD_PROCESS_DEFAULT_KILL_TIMEOUT @_child_process_mutex = Mutex.new @_child_process_processes = {} # pid => ProcessInfo end |
#shutdown ⇒ Object
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 142 def shutdown @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| process_info = @_child_process_processes[pid] next if !process_info process_info.writeio && process_info.writeio.close rescue nil end super @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| process_info = @_child_process_processes[pid] next if !process_info child_process_kill(process_info) end exit_wait_timeout = Fluent::Clock.now + @_child_process_exit_timeout while Fluent::Clock.now < exit_wait_timeout process_exists = false @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| unless @_child_process_processes[pid].exit_status process_exists = true break end end if process_exists sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL else break end end end |
#stop ⇒ Object
131 132 133 134 135 136 137 138 139 140 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 131 def stop @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid| process_info = @_child_process_processes[pid] if process_info process_info.thread[:_fluentd_plugin_helper_child_process_running] = false end end super end |
#terminate ⇒ Object
202 203 204 205 206 |
# File 'lib/fluent/plugin_helper/child_process.rb', line 202 def terminate @_child_process_processes = {} super end |