Module: Fluent::PluginHelper::ChildProcess

Includes:
Thread, Timer
Defined in:
lib/fluent/plugin_helper/child_process.rb

Defined Under Namespace

Classes: ProcessInfo

Constant Summary collapse

CHILD_PROCESS_LOOP_CHECK_INTERVAL =

sec

0.2
CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT =

sec

10
CHILD_PROCESS_DEFAULT_KILL_TIMEOUT =

sec

60
MODE_PARAMS =
[:read, :write, :stderr, :read_with_stderr]
STDERR_OPTIONS =
[:discard, :connect]

Constants included from EventLoop

EventLoop::EVENT_LOOP_RUN_DEFAULT_TIMEOUT, EventLoop::EVENT_LOOP_SHUTDOWN_TIMEOUT

Constants included from Thread

Thread::THREAD_DEFAULT_WAIT_SECONDS, Thread::THREAD_SHUTDOWN_HARD_TIMEOUT_IN_TESTS

Instance Attribute Summary collapse

Attributes included from Timer

#_timers

Attributes included from EventLoop

#_event_loop

Attributes included from Thread

#_threads

Instance Method Summary collapse

Methods included from Timer

#start, #timer_execute, #timer_running?

Methods included from EventLoop

#after_shutdown, #event_loop_attach, #event_loop_detach, #event_loop_running?, #event_loop_wait_until_start, #event_loop_wait_until_stop, #start

Methods included from Thread

#after_shutdown, #thread_create, #thread_current_running?, #thread_exist?, #thread_running?, #thread_started?, #thread_wait_until_start, #thread_wait_until_stop

Instance Attribute Details

#_child_process_processesObject (readonly)

stop : mark callback thread as stopped shutdown : close write IO to child processes (STDIN of child processes), send TERM (KILL for Windows) to all child processes close : send KILL to all child processes terminate: [-]



42
43
44
# File 'lib/fluent/plugin_helper/child_process.rb', line 42

def _child_process_processes
  @_child_process_processes
end

Instance Method Details

#child_process_execute(title, command, arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false, mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil, internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil, wait_timeout: nil, on_exit_callback: nil, &block) ⇒ Object

on_exit_callback = ->(status){ … } status is an instance of Process::Status On Windows, exitstatus=0 and termsig=nil even when child process was killed.

Raises:

  • (ArgumentError)


65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/fluent/plugin_helper/child_process.rb', line 65

def child_process_execute(
    title, command,
    arguments: nil, subprocess_name: nil, interval: nil, immediate: false, parallel: false,
    mode: [:read, :write], stderr: :discard, env: {}, unsetenv: false, chdir: nil,
    internal_encoding: 'utf-8', external_encoding: 'ascii-8bit', scrub: true, replace_string: nil,
    wait_timeout: nil, on_exit_callback: nil,
    &block
)
  raise ArgumentError, "BUG: title must be a symbol" unless title.is_a? Symbol
  raise ArgumentError, "BUG: arguments required if subprocess name is replaced" if subprocess_name && !arguments

  mode ||= []
  mode = [] unless block
  raise ArgumentError, "BUG: invalid mode specification" unless mode.all?{|m| MODE_PARAMS.include?(m) }
  raise ArgumentError, "BUG: read_with_stderr is exclusive with :read and :stderr" if mode.include?(:read_with_stderr) && (mode.include?(:read) || mode.include?(:stderr))
  raise ArgumentError, "BUG: invalid stderr handling specification" unless STDERR_OPTIONS.include?(stderr)

  raise ArgumentError, "BUG: number of block arguments are different from size of mode" if block && block.arity != mode.size

  running = false
  callback = ->(*args) {
    running = true
    begin
      block && block.call(*args)
    ensure
      running = false
    end
  }

  retval = nil
  execute_child_process = ->(){
    child_process_execute_once(
      title, command, arguments,
      subprocess_name, mode, stderr, env, unsetenv, chdir,
      internal_encoding, external_encoding, scrub, replace_string,
      wait_timeout, on_exit_callback,
      &callback
    )
  }

  if immediate || !interval
    retval = execute_child_process.call
  end

  if interval
    timer_execute(:child_process_execute, interval, repeat: true) do
      if !parallel && running
        log.warn "previous child process is still running. skipped.", title: title, command: command, arguments: arguments, interval: interval, parallel: parallel
      else
        execute_child_process.call
      end
    end
  end

  retval # nil if interval
end

#child_process_execute_once(title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir, internal_encoding, external_encoding, scrub, replace_string, wait_timeout, on_exit_callback, &block) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# File 'lib/fluent/plugin_helper/child_process.rb', line 231

def child_process_execute_once(
    title, command, arguments, subprocess_name, mode, stderr, env, unsetenv, chdir,
    internal_encoding, external_encoding, scrub, replace_string, wait_timeout, on_exit_callback, &block
)
  spawn_args = if arguments || subprocess_name
                 [ env, (subprocess_name ? [command, subprocess_name] : command), *(arguments || []) ]
               else
                 [ env, command ]
               end
  spawn_opts = {
    unsetenv_others: unsetenv,
  }
  if chdir
    spawn_opts[:chdir] = chdir
  end

  encoding_options = {}
  if scrub
    encoding_options[:invalid] = encoding_options[:undef] = :replace
    if replace_string
      encoding_options[:replace] = replace_string
    end
  end

  log.debug "Executing command", title: title, spawn: spawn_args, mode: mode, stderr: stderr

  readio = writeio = stderrio = wait_thread = nil
  readio_in_use = writeio_in_use = stderrio_in_use = false

  if !mode.include?(:stderr) && !mode.include?(:read_with_stderr)
    spawn_opts[:err] = IO::NULL if stderr == :discard
    if !mode.include?(:read) && !mode.include?(:read_with_stderr)
      spawn_opts[:out] = IO::NULL
    end
    writeio, readio, wait_thread = *Open3.popen2(*spawn_args, spawn_opts)
  elsif mode.include?(:read_with_stderr)
    writeio, readio, wait_thread = *Open3.popen2e(*spawn_args, spawn_opts)
  else
    writeio, readio, stderrio, wait_thread = *Open3.popen3(*spawn_args, spawn_opts)
  end

  if mode.include?(:write)
    writeio.set_encoding(external_encoding, internal_encoding, **encoding_options)
    writeio_in_use = true
  else
    writeio.reopen(IO::NULL) if writeio
  end
  if mode.include?(:read) || mode.include?(:read_with_stderr)
    readio.set_encoding(external_encoding, internal_encoding, **encoding_options)
    readio_in_use = true
  end
  if mode.include?(:stderr)
    stderrio.set_encoding(external_encoding, internal_encoding, **encoding_options)
    stderrio_in_use = true
  else
    stderrio.reopen(IO::NULL) if stderrio && stderr == :discard
  end

  pid = wait_thread.pid # wait_thread => Process::Waiter

  io_objects = []
  mode.each do |m|
    io_obj = case m
             when :read then readio
             when :write then writeio
             when :read_with_stderr then readio
             when :stderr then stderrio
             else
               raise "BUG: invalid mode must be checked before here: '#{m}'"
             end
    io_objects << io_obj
  end

  m = Mutex.new
  m.lock
  thread = thread_create :child_process_callback do
    m.lock # run after plugin thread get pid, thread instance and i/o
    m.unlock
    begin
      @_child_process_processes[pid].alive = true
      block.call(*io_objects) if block_given?
      writeio.close if writeio
    rescue EOFError => e
      log.debug "Process exit and I/O closed", title: title, pid: pid, command: command, arguments: arguments
    rescue IOError => e
      if e.message == 'stream closed' || e.message == 'closed stream' # "closed stream" is of ruby 2.1
        log.debug "Process I/O stream closed", title: title, pid: pid, command: command, arguments: arguments
      else
        log.error "Unexpected I/O error for child process", title: title, pid: pid, command: command, arguments: arguments, error: e
      end
    rescue Errno::EPIPE => e
      log.debug "Broken pipe, child process unexpectedly exits", title: title, pid: pid, command: command, arguments: arguments
    rescue => e
      log.warn "Unexpected error while processing I/O for child process", title: title, pid: pid, command: command, error: e
    end

    if wait_timeout
      if wait_thread.join(wait_timeout) # Thread#join returns nil when limit expires
        # wait_thread successfully exits
        @_child_process_processes[pid].exit_status = wait_thread.value
      else
        log.warn "child process timed out", title: title, pid: pid, command: command, arguments: arguments
        child_process_kill(@_child_process_processes[pid], force: true)
        @_child_process_processes[pid].exit_status = wait_thread.value
      end
    else
      @_child_process_processes[pid].exit_status = wait_thread.value # with join
    end
    process_info = @_child_process_mutex.synchronize{ @_child_process_processes.delete(pid) }

    cb = process_info.on_exit_callback_mutex.synchronize do
      cback = process_info.on_exit_callback
      process_info.on_exit_callback = nil
      cback
    end
    if cb
      cb.call(process_info.exit_status) rescue nil
    end
    process_info.readio&.close rescue nil
    process_info.writeio&.close rescue nil
    process_info.stderrio&.close rescue nil
  end
  thread[:_fluentd_plugin_helper_child_process_running] = true
  thread[:_fluentd_plugin_helper_child_process_pid] = pid
  pinfo = ProcessInfo.new(
    title, thread, pid,
    readio, readio_in_use, writeio, writeio_in_use, stderrio, stderrio_in_use,
    wait_thread, false, nil, nil, on_exit_callback, Mutex.new
  )

  @_child_process_mutex.synchronize do
    @_child_process_processes[pid] = pinfo
  end
  m.unlock
  pid
end

#child_process_exist?(pid) ⇒ Boolean

Returns:

  • (Boolean)


53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin_helper/child_process.rb', line 53

def child_process_exist?(pid)
  pinfo = @_child_process_processes[pid]
  return false unless pinfo

  return false if pinfo.exit_status

  true
end

#child_process_idObject



49
50
51
# File 'lib/fluent/plugin_helper/child_process.rb', line 49

def child_process_id
  ::Thread.current[:_fluentd_plugin_helper_child_process_pid]
end

#child_process_kill(pinfo, force: false) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/fluent/plugin_helper/child_process.rb', line 208

def child_process_kill(pinfo, force: false)
  return if !pinfo
  pinfo.killed_at = Fluent::Clock.now unless force

  pid = pinfo.pid
  begin
    if !pinfo.exit_status && child_process_exist?(pid)
      signal = (Fluent.windows? || force) ? :KILL : :TERM
      Process.kill(signal, pinfo.pid)
    end
  rescue Errno::ECHILD, Errno::ESRCH
    # ignore
  end
end

#child_process_running?Boolean

Returns:

  • (Boolean)


44
45
46
47
# File 'lib/fluent/plugin_helper/child_process.rb', line 44

def child_process_running?
  # checker for code in callback of child_process_execute
  ::Thread.current[:_fluentd_plugin_helper_child_process_running] || false
end

#closeObject



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/fluent/plugin_helper/child_process.rb', line 174

def close
  while true
    pids = @_child_process_mutex.synchronize{ @_child_process_processes.keys }
    break if pids.size < 1

    living_process_exist = false
    pids.each do |pid|
      process_info = @_child_process_processes[pid]
      next if !process_info || process_info.exit_status

      living_process_exist = true

      process_info.killed_at ||= Fluent::Clock.now # for illegular case (e.g., created after shutdown)
      timeout_at = process_info.killed_at + @_child_process_kill_timeout
      now = Fluent::Clock.now
      next if now < timeout_at

      child_process_kill(process_info, force: true)
    end

    break if living_process_exist

    sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
  end

  super
end

#initializeObject



122
123
124
125
126
127
128
129
# File 'lib/fluent/plugin_helper/child_process.rb', line 122

def initialize
  super
  # plugins MAY configure this parameter
  @_child_process_exit_timeout = CHILD_PROCESS_DEFAULT_EXIT_TIMEOUT
  @_child_process_kill_timeout = CHILD_PROCESS_DEFAULT_KILL_TIMEOUT
  @_child_process_mutex = Mutex.new
  @_child_process_processes = {} # pid => ProcessInfo
end

#shutdownObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/fluent/plugin_helper/child_process.rb', line 142

def shutdown
  @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
    process_info = @_child_process_processes[pid]
    next if !process_info
    process_info.writeio && process_info.writeio.close rescue nil
  end

  super

  @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
    process_info = @_child_process_processes[pid]
    next if !process_info
    child_process_kill(process_info)
  end

  exit_wait_timeout = Fluent::Clock.now + @_child_process_exit_timeout
  while Fluent::Clock.now < exit_wait_timeout
    process_exists = false
    @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
      unless @_child_process_processes[pid].exit_status
        process_exists = true
        break
      end
    end
    if process_exists
      sleep CHILD_PROCESS_LOOP_CHECK_INTERVAL
    else
      break
    end
  end
end

#stopObject



131
132
133
134
135
136
137
138
139
140
# File 'lib/fluent/plugin_helper/child_process.rb', line 131

def stop
  @_child_process_mutex.synchronize{ @_child_process_processes.keys }.each do |pid|
    process_info = @_child_process_processes[pid]
    if process_info
      process_info.thread[:_fluentd_plugin_helper_child_process_running] = false
    end
  end

  super
end

#terminateObject



202
203
204
205
206
# File 'lib/fluent/plugin_helper/child_process.rb', line 202

def terminate
  @_child_process_processes = {}

  super
end