Class: Fluent::ExecFilterOutput::ChildProcess

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_exec_filter.rb

Instance Method Summary collapse

Constructor Details

#initialize(parser, respawns = 0) ⇒ ChildProcess

Returns a new instance of ChildProcess.



230
231
232
233
234
235
236
# File 'lib/fluent/plugin/out_exec_filter.rb', line 230

def initialize(parser,respawns=0)
  @pid = nil
  @thread = nil
  @parser = parser
  @respawns = respawns
  @mutex = Mutex.new
end

Instance Method Details

#kill_child(join_wait) ⇒ Object



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/fluent/plugin/out_exec_filter.rb', line 249

def kill_child(join_wait)
  begin
    Process.kill(:TERM, @pid)
  rescue Errno::ESRCH
    # Errno::ESRCH 'No such process', ignore
    # child process killed by signal chained from fluentd process
  end
  if @thread.join(join_wait)
    # @thread successfully shutdown
    return
  end
  begin
    Process.kill(:KILL, @pid)
  rescue Errno::ESRCH
    # ignore if successfully killed by :TERM
  end
  @thread.join
end

#runObject



307
308
309
310
311
312
313
314
315
316
317
318
319
320
# File 'lib/fluent/plugin/out_exec_filter.rb', line 307

def run
  @parser.call(@io)
rescue
  $log.error "exec_filter thread unexpectedly failed with an error.", :command=>@command, :error=>$!.to_s
  $log.warn_backtrace $!.backtrace
ensure
  pid, stat = Process.waitpid2(@pid)
  unless @finished
    $log.error "exec_filter process unexpectedly exited.", :command=>@command, :ecode=>stat.to_i
    unless @respawns == 0
      $log.warn "exec_filter child process will respawn for next input data (respawns #{@respawns})."
    end
  end
end

#shutdownObject



268
269
270
271
272
273
# File 'lib/fluent/plugin/out_exec_filter.rb', line 268

def shutdown
  @finished = true
  @mutex.synchronize do
    kill_child(60) # TODO wait time
  end
end

#start(command) ⇒ Object



238
239
240
241
242
243
244
245
246
247
# File 'lib/fluent/plugin/out_exec_filter.rb', line 238

def start(command)
  @command = command
  @mutex.synchronize do
    @io = IO.popen(command, "r+")
    @pid = @io.pid
    @io.sync = true
    @thread = Thread.new(&method(:run))
  end
  @finished = false
end

#try_respawnObject



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
# File 'lib/fluent/plugin/out_exec_filter.rb', line 289

def try_respawn
  return false if @respawns == 0
  @mutex.synchronize do
    return false if @respawns == 0

    kill_child(5) # TODO wait time

    @io = IO.popen(@command, "r+")
    @pid = @io.pid
    @io.sync = true
    @thread = Thread.new(&method(:run))

    @respawns -= 1 if @respawns > 0
  end
  $log.warn "exec_filter child process successfully respawned.", :command => @command, :respawns => @respawns
  true
end

#write(chunk) ⇒ Object



275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/fluent/plugin/out_exec_filter.rb', line 275

def write(chunk)
  begin
    chunk.write_to(@io)
  rescue Errno::EPIPE => e
    # Broken pipe (child process unexpectedly exited)
    $log.warn "exec_filter Broken pipe, child process maybe exited.", :command => @command
    if try_respawn
      retry # retry chunk#write_to with child respawned
    else
      raise e # to retry #write with other ChildProcess instance (when num_children > 1)
    end
  end
end