Class: Fluent::Supervisor
- Inherits:
-
Object
- Object
- Fluent::Supervisor
- Defined in:
- lib/fluent/supervisor.rb
Defined Under Namespace
Classes: LoggerInitializer
Class Method Summary collapse
Instance Method Summary collapse
- #configure(supervisor: false) ⇒ Object
-
#initialize(opt) ⇒ Supervisor
constructor
A new instance of Supervisor.
- #options ⇒ Object
- #run_supervisor(dry_run: false) ⇒ Object
- #run_worker ⇒ Object
Constructor Details
#initialize(opt) ⇒ Supervisor
Returns a new instance of Supervisor.
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 |
# File 'lib/fluent/supervisor.rb', line 589 def initialize(opt) @daemonize = opt[:daemonize] @standalone_worker= opt[:standalone_worker] @config_path = opt[:config_path] @inline_config = opt[:inline_config] @use_v1_config = opt[:use_v1_config] @conf_encoding = opt[:conf_encoding] @log_path = opt[:log_path] @show_plugin_config = opt[:show_plugin_config] @libs = opt[:libs] @plugin_dirs = opt[:plugin_dirs] @chgroup = opt[:chgroup] @chuser = opt[:chuser] @log_rotate_age = opt[:log_rotate_age] @log_rotate_size = opt[:log_rotate_size] @signame = opt[:signame] @cl_opt = opt @conf = nil log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace], ignore_repeated_log_interval: opt[:ignore_repeated_log_interval], ignore_same_log_interval: opt[:ignore_same_log_interval]} @log = LoggerInitializer.new( @log_path, opt[:log_level], @chuser, @chgroup, log_opts, log_rotate_age: @log_rotate_age, log_rotate_size: @log_rotate_size ) @finished = false end |
Class Method Details
.cleanup_resources ⇒ Object
581 582 583 584 585 586 587 |
# File 'lib/fluent/supervisor.rb', line 581 def self.cleanup_resources unless Fluent.windows? if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) end end end |
.default_options ⇒ Object
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 |
# File 'lib/fluent/supervisor.rb', line 555 def self. { config_path: Fluent::DEFAULT_CONFIG_PATH, plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR], log_level: Fluent::Log::LEVEL_INFO, log_path: nil, daemonize: nil, libs: [], setup_path: nil, chuser: nil, chgroup: nil, root_dir: nil, suppress_interval: 0, suppress_repeated_stacktrace: true, ignore_repeated_log_interval: nil, without_source: nil, use_v1_config: true, strict_config_value: nil, supervise: true, standalone_worker: false, signame: nil, conf_encoding: 'utf-8', disable_shared_socket: nil } end |
.load_config(path, params = {}) ⇒ Object
380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/fluent/supervisor.rb', line 380 def self.load_config(path, params = {}) pre_loadtime = 0 pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime'] pre_config_mtime = nil pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime'] config_mtime = File.mtime(path) # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed if (Time.now - Time.at(pre_loadtime) < 5) && (config_mtime == pre_config_mtime) return params['pre_conf'] end log_level = params['log_level'] suppress_repeated_stacktrace = params['suppress_repeated_stacktrace'] ignore_repeated_log_interval = params['ignore_repeated_log_interval'] ignore_same_log_interval = params['ignore_same_log_interval'] log_path = params['log_path'] chuser = params['chuser'] chgroup = params['chgroup'] log_rotate_age = params['log_rotate_age'] log_rotate_size = params['log_rotate_size'] log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval, ignore_same_log_interval: ignore_same_log_interval} logger_initializer = Supervisor::LoggerInitializer.new( log_path, log_level, chuser, chgroup, log_opts, log_rotate_age: log_rotate_age, log_rotate_size: log_rotate_size ) # this #init sets initialized logger to $log logger_initializer.init(:supervisor, 0) logger_initializer.(format: params['log_format'], time_format: params['log_time_format']) logger = $log command_sender = Fluent.windows? ? "pipe" : "signal" # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path" pid_path = params['daemonize'] daemonize = !!params['daemonize'] se_config = { worker_type: 'spawn', workers: params['workers'], log_stdin: false, log_stdout: false, log_stderr: false, enable_heartbeat: true, auto_heartbeat: false, unrecoverable_exit_codes: [2], stop_immediately_at_unrecoverable_exit: true, root_dir: params['root_dir'], logger: logger, log: logger.out, log_path: log_path, log_level: log_level, logger_initializer: logger_initializer, chuser: chuser, chgroup: chgroup, chumask: 0, suppress_repeated_stacktrace: suppress_repeated_stacktrace, ignore_repeated_log_interval: ignore_repeated_log_interval, ignore_same_log_interval: ignore_same_log_interval, daemonize: daemonize, rpc_endpoint: params['rpc_endpoint'], counter_server: params['counter_server'], enable_get_dump: params['enable_get_dump'], windows_daemon_cmdline: [ServerEngine.ruby_bin_path, File.join(File.dirname(__FILE__), 'daemon.rb'), ServerModule.name, WorkerModule.name, path, JSON.dump(params)], command_sender: command_sender, fluentd_conf: params['fluentd_conf'], conf_encoding: params['conf_encoding'], inline_config: params['inline_config'], config_path: path, main_cmd: params['main_cmd'], signame: params['signame'], disable_shared_socket: params['disable_shared_socket'] } if daemonize se_config[:pid_path] = pid_path end pre_params = params.dup params['pre_loadtime'] = Time.now.to_i params['pre_config_mtime'] = config_mtime params['pre_conf'] = se_config # prevent pre_conf from being too big by reloading many times. pre_params['pre_conf'] = nil params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params) se_config end |
Instance Method Details
#configure(supervisor: false) ⇒ Object
706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 |
# File 'lib/fluent/supervisor.rb', line 706 def configure(supervisor: false) if supervisor @log.init(:supervisor, 0) else worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i process_type = case when @standalone_worker then :standalone when worker_id == 0 then :worker0 else :workers end @log.init(process_type, worker_id) end if @show_plugin_config show_plugin_config end if @inline_config == '-' $log.warn('the value "-" for `inline_config` is deprecated. See https://github.com/fluent/fluentd/issues/2711') @inline_config = STDIN.read end @conf = Fluent::Config.build(config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config) @system_config = build_system_config(@conf) @log.level = @system_config.log_level @log.( format: @system_config.log.format, time_format: @system_config.log.time_format, log_dir_perm: @system_config., ignore_repeated_log_interval: @system_config.ignore_repeated_log_interval, ignore_same_log_interval: @system_config.ignore_same_log_interval ) $log.info :supervisor, 'parsing config file is succeeded', path: @config_path @libs.each do |lib| require lib end @plugin_dirs.each do |dir| if Dir.exist?(dir) dir = File.(dir) Fluent::Plugin.add_plugin_dir(dir) end end if supervisor # plugins / configuration dumps Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec| $log.info("gem '#{spec.name}' version '#{spec.version}'") end end end |
#options ⇒ Object
663 664 665 666 667 668 669 670 671 |
# File 'lib/fluent/supervisor.rb', line 663 def { 'config_path' => @config_path, 'pid_file' => @daemonize, 'plugin_dirs' => @plugin_dirs, 'log_path' => @log_path, 'root_dir' => @system_config.root_dir, } end |
#run_supervisor(dry_run: false) ⇒ Object
620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 |
# File 'lib/fluent/supervisor.rb', line 620 def run_supervisor(dry_run: false) if dry_run $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION end if @system_config.workers < 1 raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}" end root_dir = @system_config.root_dir if root_dir if File.exist?(root_dir) unless Dir.exist?(root_dir) raise Fluent::InvalidRootDirectory, "non directory entry exists:#{root_dir}" end else begin FileUtils.mkdir_p(root_dir, mode: @system_config. || 0755) rescue => e raise Fluent::InvalidRootDirectory, "failed to create root directory:#{root_dir}, #{e.inspect}" end end end begin ServerEngine::Privilege.change(@chuser, @chgroup) MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) Fluent::Engine.init(@system_config, supervisor_mode: true) Fluent::Engine.run_configure(@conf, dry_run: dry_run) rescue Fluent::ConfigError => e $log.error 'config error', file: @config_path, error: e $log.debug_backtrace exit!(1) end if dry_run $log.info 'finished dry run mode' exit 0 else supervise end end |
#run_worker ⇒ Object
673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 |
# File 'lib/fluent/supervisor.rb', line 673 def run_worker begin require 'sigdump/setup' rescue Exception # ignore LoadError and others (related with signals): it may raise these errors in Windows end Process.setproctitle("worker:#{@system_config.process_name}") if @process_name if @standalone_worker && @system_config.workers != 1 raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}" end install_main_process_signal_handlers # This is the only log messsage for @standalone_worker $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid, ruby: RUBY_VERSION if @standalone_worker main_process do create_socket_manager if @standalone_worker if @standalone_worker ServerEngine::Privilege.change(@chuser, @chgroup) File.umask(0) end MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) Fluent::Engine.init(@system_config) Fluent::Engine.run_configure(@conf) Fluent::Engine.run self.class.cleanup_resources if @standalone_worker exit 0 end end |