Class: Fluent::Compat::TimeSlicedOutput
- Inherits:
-
Plugin::Output
- Object
- Plugin::Base
- Plugin::Output
- Fluent::Compat::TimeSlicedOutput
- Includes:
- PropagateDefault
- Defined in:
- lib/fluent/compat/output.rb
Constant Summary collapse
- BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS.merge(Fluent::PluginHelper::CompatParameters::BUFFER_TIME_SLICED_PARAMS)
Constants inherited from Plugin::Output
Plugin::Output::BUFFER_STATS_KEYS, Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_ID_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Plugin::Output::UNRECOVERABLE_ERRORS
Constants included from Fluent::Configurable
Fluent::Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#localtime ⇒ Object
Returns the value of attribute localtime.
Attributes inherited from Plugin::Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count
Attributes included from PluginLoggerMixin
Attributes inherited from Plugin::Base
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #detach_multi_process(&block) ⇒ Object
- #detach_process(&block) ⇒ Object
-
#extract_placeholders(str, metadata) ⇒ Object
#format MUST be implemented in plugin #write is also.
-
#initialize ⇒ TimeSlicedOutput
constructor
A new instance of TimeSlicedOutput.
- #start ⇒ Object
- #support_in_v12_style?(feature) ⇒ Boolean
Methods included from PropagateDefault
Methods inherited from Plugin::Output
#acts_as_secondary, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #multi_workers_ready?, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #statistics, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from PluginHelper::Mixin
Methods included from PluginLoggerMixin
Methods included from PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Plugin::Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Fluent::Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ TimeSlicedOutput
Returns a new instance of TimeSlicedOutput.
608 609 610 611 612 613 614 615 |
# File 'lib/fluent/compat/output.rb', line 608 def initialize super @localtime = true unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end |
Instance Attribute Details
#localtime ⇒ Object
Returns the value of attribute localtime.
600 601 602 |
# File 'lib/fluent/compat/output.rb', line 600 def localtime @localtime end |
Class Method Details
.propagate_default_params ⇒ Object
617 618 619 |
# File 'lib/fluent/compat/output.rb', line 617 def self.propagate_default_params BUFFER_PARAMS end |
Instance Method Details
#configure(conf) ⇒ Object
622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 |
# File 'lib/fluent/compat/output.rb', line 622 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"), "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end if conf['timezone'] Fluent::Timezone.validate!(conf['timezone']) elsif conf['utc'] # v0.12 assumes UTC without any configuration # 'localtime=false && no timezone key' means UTC conf['localtime'] = "false" conf.delete('utc') elsif conf['localtime'] conf['timezone'] = Time.now.strftime('%z') conf['localtime'] = "true" else # v0.12 assumes UTC without any configuration # 'localtime=false && no timezone key' means UTC conf['localtime'] = "false" end @_timekey = case conf['time_slice_format'] when /\%S/ then 1 when /\%M/ then 60 when /\%H/ then 3600 when /\%d/ then 86400 when nil then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d' else raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" end buf_params["timekey"] = @_timekey conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, []) end ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 if @buffer_config.chunk_keys == ['tag'] raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only" end end self.extend TimeSliceChunkMixin end |
#detach_multi_process(&block) ⇒ Object
703 704 705 706 |
# File 'lib/fluent/compat/output.rb', line 703 def detach_multi_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#detach_process(&block) ⇒ Object
698 699 700 701 |
# File 'lib/fluent/compat/output.rb', line 698 def detach_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#extract_placeholders(str, metadata) ⇒ Object
#format MUST be implemented in plugin #write is also
713 714 715 |
# File 'lib/fluent/compat/output.rb', line 713 def extract_placeholders(str, ) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end |
#start ⇒ Object
685 686 687 688 689 690 691 692 693 694 695 696 |
# File 'lib/fluent/compat/output.rb', line 685 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end |
#support_in_v12_style?(feature) ⇒ Boolean
557 558 559 560 561 562 563 564 |
# File 'lib/fluent/compat/output.rb', line 557 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then true end end |