Class: Fluent::Supervisor

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/supervisor.rb

Defined Under Namespace

Classes: LoggerInitializer

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opt) ⇒ Supervisor

Returns a new instance of Supervisor.



589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
# File 'lib/fluent/supervisor.rb', line 589

def initialize(opt)
  @daemonize = opt[:daemonize]
  @standalone_worker= opt[:standalone_worker]
  @config_path = opt[:config_path]
  @inline_config = opt[:inline_config]
  @use_v1_config = opt[:use_v1_config]
  @conf_encoding = opt[:conf_encoding]
  @log_path = opt[:log_path]
  @show_plugin_config = opt[:show_plugin_config]
  @libs = opt[:libs]
  @plugin_dirs = opt[:plugin_dirs]
  @chgroup = opt[:chgroup]
  @chuser = opt[:chuser]

  @log_rotate_age = opt[:log_rotate_age]
  @log_rotate_size = opt[:log_rotate_size]
  @signame = opt[:signame]

  @cl_opt = opt
  @conf = nil

  log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace], ignore_repeated_log_interval: opt[:ignore_repeated_log_interval],
              ignore_same_log_interval: opt[:ignore_same_log_interval]}
  @log = LoggerInitializer.new(
    @log_path, opt[:log_level], @chuser, @chgroup, log_opts,
    log_rotate_age: @log_rotate_age,
    log_rotate_size: @log_rotate_size
  )
  @finished = false
end

Class Method Details

.cleanup_resourcesObject



581
582
583
584
585
586
587
# File 'lib/fluent/supervisor.rb', line 581

def self.cleanup_resources
  unless Fluent.windows?
    if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
      FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
    end
  end
end

.default_optionsObject



555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
# File 'lib/fluent/supervisor.rb', line 555

def self.default_options
  {
    config_path: Fluent::DEFAULT_CONFIG_PATH,
    plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR],
    log_level: Fluent::Log::LEVEL_INFO,
    log_path: nil,
    daemonize: nil,
    libs: [],
    setup_path: nil,
    chuser: nil,
    chgroup: nil,
    root_dir: nil,
    suppress_interval: 0,
    suppress_repeated_stacktrace: true,
    ignore_repeated_log_interval: nil,
    without_source: nil,
    use_v1_config: true,
    strict_config_value: nil,
    supervise: true,
    standalone_worker: false,
    signame: nil,
    conf_encoding: 'utf-8',
    disable_shared_socket: nil
  }
end

.load_config(path, params = {}) ⇒ Object



380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# File 'lib/fluent/supervisor.rb', line 380

def self.load_config(path, params = {})
  pre_loadtime = 0
  pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
  pre_config_mtime = nil
  pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime']
  config_mtime = File.mtime(path)

  # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed
  if (Time.now - Time.at(pre_loadtime) < 5) && (config_mtime == pre_config_mtime)
    return params['pre_conf']
  end

  log_level = params['log_level']
  suppress_repeated_stacktrace = params['suppress_repeated_stacktrace']
  ignore_repeated_log_interval = params['ignore_repeated_log_interval']
  ignore_same_log_interval = params['ignore_same_log_interval']

  log_path = params['log_path']
  chuser = params['chuser']
  chgroup = params['chgroup']
  log_rotate_age = params['log_rotate_age']
  log_rotate_size = params['log_rotate_size']

  log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval,
              ignore_same_log_interval: ignore_same_log_interval}
  logger_initializer = Supervisor::LoggerInitializer.new(
    log_path, log_level, chuser, chgroup, log_opts,
    log_rotate_age: log_rotate_age,
    log_rotate_size: log_rotate_size
  )
  # this #init sets initialized logger to $log
  logger_initializer.init(:supervisor, 0)
  logger_initializer.apply_options(format: params['log_format'], time_format: params['log_time_format'])
  logger = $log

  command_sender = Fluent.windows? ? "pipe" : "signal"

  # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path"
  pid_path = params['daemonize']
  daemonize = !!params['daemonize']

  se_config = {
    worker_type: 'spawn',
    workers: params['workers'],
    log_stdin: false,
    log_stdout: false,
    log_stderr: false,
    enable_heartbeat: true,
    auto_heartbeat: false,
    unrecoverable_exit_codes: [2],
    stop_immediately_at_unrecoverable_exit: true,
    root_dir: params['root_dir'],
    logger: logger,
    log: logger.out,
    log_path: log_path,
    log_level: log_level,
    logger_initializer: logger_initializer,
    chuser: chuser,
    chgroup: chgroup,
    chumask: 0,
    suppress_repeated_stacktrace: suppress_repeated_stacktrace,
    ignore_repeated_log_interval: ignore_repeated_log_interval,
    ignore_same_log_interval: ignore_same_log_interval,
    daemonize: daemonize,
    rpc_endpoint: params['rpc_endpoint'],
    counter_server: params['counter_server'],
    enable_get_dump: params['enable_get_dump'],
    windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
                             File.join(File.dirname(__FILE__), 'daemon.rb'),
                             ServerModule.name,
                             WorkerModule.name,
                             path,
                             JSON.dump(params)],
    command_sender: command_sender,
    fluentd_conf: params['fluentd_conf'],
    conf_encoding: params['conf_encoding'],
    inline_config: params['inline_config'],
    config_path: path,
    main_cmd: params['main_cmd'],
    signame: params['signame'],
    disable_shared_socket: params['disable_shared_socket']
  }
  if daemonize
    se_config[:pid_path] = pid_path
  end
  pre_params = params.dup
  params['pre_loadtime'] = Time.now.to_i
  params['pre_config_mtime'] = config_mtime
  params['pre_conf'] = se_config
  # prevent pre_conf from being too big by reloading many times.
  pre_params['pre_conf'] = nil
  params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)

  se_config
end

Instance Method Details

#configure(supervisor: false) ⇒ Object



706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
# File 'lib/fluent/supervisor.rb', line 706

def configure(supervisor: false)
  if supervisor
    @log.init(:supervisor, 0)
  else
    worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i
    process_type = case
                   when @standalone_worker then :standalone
                   when worker_id == 0 then :worker0
                   else :workers
                   end
    @log.init(process_type, worker_id)
  end

  if @show_plugin_config
    show_plugin_config
  end

  if @inline_config == '-'
    $log.warn('the value "-" for `inline_config` is deprecated. See https://github.com/fluent/fluentd/issues/2711')
    @inline_config = STDIN.read
  end
  @conf = Fluent::Config.build(config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config)
  @system_config = build_system_config(@conf)

  @log.level = @system_config.log_level
  @log.apply_options(
    format: @system_config.log.format,
    time_format: @system_config.log.time_format,
    log_dir_perm: @system_config.dir_permission,
    ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval,
    ignore_same_log_interval: @system_config.ignore_same_log_interval
  )

  $log.info :supervisor, 'parsing config file is succeeded', path: @config_path

  @libs.each do |lib|
    require lib
  end

  @plugin_dirs.each do |dir|
    if Dir.exist?(dir)
      dir = File.expand_path(dir)
      Fluent::Plugin.add_plugin_dir(dir)
    end
  end

  if supervisor
    # plugins / configuration dumps
    Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec|
      $log.info("gem '#{spec.name}' version '#{spec.version}'")
    end
  end
end

#optionsObject



663
664
665
666
667
668
669
670
671
# File 'lib/fluent/supervisor.rb', line 663

def options
  {
    'config_path' => @config_path,
    'pid_file' => @daemonize,
    'plugin_dirs' => @plugin_dirs,
    'log_path' => @log_path,
    'root_dir' => @system_config.root_dir,
  }
end

#run_supervisor(dry_run: false) ⇒ Object



620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
# File 'lib/fluent/supervisor.rb', line 620

def run_supervisor(dry_run: false)
  if dry_run
    $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION
  end

  if @system_config.workers < 1
    raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}"
  end

  root_dir = @system_config.root_dir
  if root_dir
    if File.exist?(root_dir)
      unless Dir.exist?(root_dir)
        raise Fluent::InvalidRootDirectory, "non directory entry exists:#{root_dir}"
      end
    else
      begin
        FileUtils.mkdir_p(root_dir, mode: @system_config.dir_permission || 0755)
      rescue => e
        raise Fluent::InvalidRootDirectory, "failed to create root directory:#{root_dir}, #{e.inspect}"
      end
    end
  end

  begin
    ServerEngine::Privilege.change(@chuser, @chgroup)
    MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
    Fluent::Engine.init(@system_config, supervisor_mode: true)
    Fluent::Engine.run_configure(@conf, dry_run: dry_run)
  rescue Fluent::ConfigError => e
    $log.error 'config error', file: @config_path, error: e
    $log.debug_backtrace
    exit!(1)
  end

  if dry_run
    $log.info 'finished dry run mode'
    exit 0
  else
    supervise
  end
end

#run_workerObject



673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
# File 'lib/fluent/supervisor.rb', line 673

def run_worker
  begin
    require 'sigdump/setup'
  rescue Exception
    # ignore LoadError and others (related with signals): it may raise these errors in Windows
  end

  Process.setproctitle("worker:#{@system_config.process_name}") if @process_name

  if @standalone_worker && @system_config.workers != 1
    raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}"
  end

  install_main_process_signal_handlers

  # This is the only log messsage for @standalone_worker
  $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid, ruby: RUBY_VERSION if @standalone_worker

  main_process do
    create_socket_manager if @standalone_worker
    if @standalone_worker
      ServerEngine::Privilege.change(@chuser, @chgroup)
      File.umask(0)
    end
    MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
    Fluent::Engine.init(@system_config)
    Fluent::Engine.run_configure(@conf)
    Fluent::Engine.run
    self.class.cleanup_resources if @standalone_worker
    exit 0
  end
end