Class: Fluent::Compat::TimeSlicedOutput

Inherits:
Plugin::Output show all
Includes:
PropagateDefault
Defined in:
lib/fluent/compat/output.rb

Constant Summary collapse

BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS.merge(Fluent::PluginHelper::CompatParameters::BUFFER_TIME_SLICED_PARAMS)

Constants inherited from Plugin::Output

Plugin::Output::BUFFER_STATS_KEYS, Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_ID_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Plugin::Output::UNRECOVERABLE_ERRORS

Constants included from Fluent::Configurable

Fluent::Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Plugin::Output

#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count

Attributes included from PluginLoggerMixin

#log

Attributes inherited from Plugin::Base

#under_plugin_development

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagateDefault

included

Methods inherited from Plugin::Output

#acts_as_secondary, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #multi_workers_ready?, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #statistics, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from PluginHelper::Mixin

included

Methods included from PluginLoggerMixin

included, #terminate

Methods included from PluginId

#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Plugin::Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Fluent::Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeTimeSlicedOutput

Returns a new instance of TimeSlicedOutput.



608
609
610
611
612
613
614
615
# File 'lib/fluent/compat/output.rb', line 608

def initialize
  super
  @localtime = true

  unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
    self.class.prepend Fluent::Compat::CallSuperMixin
  end
end

Instance Attribute Details

#localtimeObject

Returns the value of attribute localtime.



600
601
602
# File 'lib/fluent/compat/output.rb', line 600

def localtime
  @localtime
end

Class Method Details

.propagate_default_paramsObject



617
618
619
# File 'lib/fluent/compat/output.rb', line 617

def self.propagate_default_params
  BUFFER_PARAMS
end

Instance Method Details

#configure(conf) ⇒ Object



622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
# File 'lib/fluent/compat/output.rb', line 622

def configure(conf)
  bufconf = CompatOutputUtils.buffer_section(conf)
  config_style = (bufconf ? :v1 : :v0)
  if config_style == :v0
    buf_params = {
      "flush_mode" => (conf['flush_interval'] ? "interval" : "lazy"),
      "retry_type" => "exponential_backoff",
    }
    BUFFER_PARAMS.each do |older, newer|
      next unless newer
      if conf.has_key?(older)
        if older == 'buffer_queue_full_action' && conf[older] == 'exception'
          buf_params[newer] = 'throw_exception'
        else
          buf_params[newer] = conf[older]
        end
      end
    end

    if conf['timezone']
      Fluent::Timezone.validate!(conf['timezone'])
    elsif conf['utc']
      # v0.12 assumes UTC without any configuration
      # 'localtime=false && no timezone key' means UTC
      conf['localtime'] = "false"
      conf.delete('utc')
    elsif conf['localtime']
      conf['timezone'] = Time.now.strftime('%z')
      conf['localtime'] = "true"
    else
      # v0.12 assumes UTC without any configuration
      # 'localtime=false && no timezone key' means UTC
      conf['localtime'] = "false"
    end

    @_timekey = case conf['time_slice_format']
                when /\%S/ then 1
                when /\%M/ then 60
                when /\%H/ then 3600
                when /\%d/ then 86400
                when nil   then 86400 # default value of TimeSlicedOutput.time_slice_format is '%Y%m%d'
                else
                  raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
                end
    buf_params["timekey"] = @_timekey

    conf.elements << Fluent::Config::Element.new('buffer', 'time', buf_params, [])
  end

  ParserUtils.convert_parser_conf(conf)
  FormatterUtils.convert_formatter_conf(conf)

  super

  if config_style == :v1
    if @buffer_config.chunk_keys == ['tag']
      raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only"
    end
  end

  self.extend TimeSliceChunkMixin
end

#detach_multi_process(&block) ⇒ Object



703
704
705
706
# File 'lib/fluent/compat/output.rb', line 703

def detach_multi_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end

#detach_process(&block) ⇒ Object



698
699
700
701
# File 'lib/fluent/compat/output.rb', line 698

def detach_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end

#extract_placeholders(str, metadata) ⇒ Object

#format MUST be implemented in plugin #write is also



713
714
715
# File 'lib/fluent/compat/output.rb', line 713

def extract_placeholders(str, )
  raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end

#startObject



685
686
687
688
689
690
691
692
693
694
695
696
# File 'lib/fluent/compat/output.rb', line 685

def start
  super

  if instance_variable_defined?(:@formatter) && @inject_config
    unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
      if @formatter.respond_to?(:owner) && !@formatter.owner
        @formatter.owner = self
        @formatter.singleton_class.prepend FormatterUtils::InjectMixin
      end
    end
  end
end

#support_in_v12_style?(feature) ⇒ Boolean

Returns:

  • (Boolean)


557
558
559
560
561
562
563
564
# File 'lib/fluent/compat/output.rb', line 557

def support_in_v12_style?(feature)
  case feature
  when :synchronous    then false
  when :buffered       then true
  when :delayed_commit then false
  when :custom_format  then true
  end
end