Class: Fluent::Compat::TimeSlicedOutput
- Inherits:
-
Plugin::Output
- Object
- Plugin::Base
- Plugin::Output
- Fluent::Compat::TimeSlicedOutput
- Includes:
- PropagateDefault
- Defined in:
- lib/fluent/compat/output.rb
Constant Summary collapse
- BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS.merge(Fluent::PluginHelper::CompatParameters::BUFFER_TIME_SLICED_PARAMS)
Constants inherited from Plugin::Output
Plugin::Output::BUFFER_STATS_KEYS, Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_ID_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Plugin::Output::UNRECOVERABLE_ERRORS
Constants included from Fluent::Configurable
Fluent::Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#localtime ⇒ Object
Returns the value of attribute localtime.
Attributes inherited from Plugin::Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #secondary, #timekey_zone
Attributes included from PluginLoggerMixin
Attributes inherited from Plugin::Base
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #detach_multi_process(&block) ⇒ Object
- #detach_process(&block) ⇒ Object
-
#extract_placeholders(str, metadata) ⇒ Object
#format MUST be implemented in plugin #write is also.
-
#initialize ⇒ TimeSlicedOutput
constructor
A new instance of TimeSlicedOutput.
- #start ⇒ Object
- #support_in_v12_style?(feature) ⇒ Boolean
Methods included from PropagateDefault
Methods inherited from Plugin::Output
#acts_as_secondary, #actual_flush_thread_count, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_count, #emit_events, #emit_records, #emit_size, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_limit_reached, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #keep_buffer_config_compat, #log_retry_error, #metadata, #multi_workers_ready?, #next_flush_time, #num_errors, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_count, #rollback_write, #shutdown, #statistics, #stop, #submit_flush_all, #submit_flush_once, #synchronize_in_threads, #synchronize_path, #synchronize_path_in_workers, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write, #write_count, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from PluginHelper::Mixin
Methods included from PluginLoggerMixin
Methods included from PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Plugin::Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Fluent::Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ TimeSlicedOutput
Returns a new instance of TimeSlicedOutput.
611 612 613 614 615 616 617 618 |
# File 'lib/fluent/compat/output.rb', line 611 def initialize super @localtime = true unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end |
Instance Attribute Details
#localtime ⇒ Object
Returns the value of attribute localtime.
603 604 605 |
# File 'lib/fluent/compat/output.rb', line 603 def localtime @localtime end |
Class Method Details
.propagate_default_params ⇒ Object
620 621 622 |
# File 'lib/fluent/compat/output.rb', line 620 def self.propagate_default_params BUFFER_PARAMS end |
Instance Method Details
#configure(conf) ⇒ Object
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 |
# File 'lib/fluent/compat/output.rb', line 625 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"), "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end if conf['timezone'] Fluent::Timezone.validate!(conf['timezone']) elsif conf['utc'] # v0.12 assumes UTC without any configuration # 'localtime=false && no timezone key' means UTC conf['localtime'] = "false" conf.delete('utc') elsif conf['localtime'] conf['timezone'] = Time.now.strftime('%z') conf['localtime'] = "true" else # v0.12 assumes UTC without any configuration # 'localtime=false && no timezone key' means UTC conf['localtime'] = "false" end @_timekey = case conf['time_slice_format'] when /\%S/ then 1 when /\%M/ then 60 when /\%H/ then 3600 when /\%d/ then 86400 when nil then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d' else raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" end buf_params["timekey"] = @_timekey conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, []) end ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 if @buffer_config.chunk_keys == ['tag'] raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only" end end self.extend TimeSliceChunkMixin end |
#detach_multi_process(&block) ⇒ Object
706 707 708 709 |
# File 'lib/fluent/compat/output.rb', line 706 def detach_multi_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#detach_process(&block) ⇒ Object
701 702 703 704 |
# File 'lib/fluent/compat/output.rb', line 701 def detach_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#extract_placeholders(str, metadata) ⇒ Object
#format MUST be implemented in plugin #write is also
716 717 718 |
# File 'lib/fluent/compat/output.rb', line 716 def extract_placeholders(str, ) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end |
#start ⇒ Object
688 689 690 691 692 693 694 695 696 697 698 699 |
# File 'lib/fluent/compat/output.rb', line 688 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end |
#support_in_v12_style?(feature) ⇒ Boolean
560 561 562 563 564 565 566 567 |
# File 'lib/fluent/compat/output.rb', line 560 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then true end end |