Class: Fluent::Compat::ObjectBufferedOutput

Inherits:
Plugin::Output show all
Includes:
PropagateDefault
Defined in:
lib/fluent/compat/output.rb

Constant Summary collapse

BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS

Constants inherited from Plugin::Output

Plugin::Output::BUFFER_STATS_KEYS, Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_ID_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Plugin::Output::UNRECOVERABLE_ERRORS

Constants included from Fluent::Configurable

Fluent::Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes inherited from Plugin::Output

#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #secondary, #timekey_zone

Attributes included from PluginLoggerMixin

#log

Attributes inherited from Plugin::Base

#under_plugin_development

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagateDefault

included

Methods inherited from Plugin::Output

#acts_as_secondary, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_count, #emit_events, #emit_records, #emit_size, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_limit_reached, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #log_retry_error, #metadata, #multi_workers_ready?, #next_flush_time, #num_errors, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_count, #rollback_write, #shutdown, #statistics, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write_count, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from PluginHelper::Mixin

included

Methods included from PluginLoggerMixin

included, #terminate

Methods included from PluginId

#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Plugin::Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Fluent::Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeObjectBufferedOutput

Returns a new instance of ObjectBufferedOutput.



524
525
526
527
528
529
# File 'lib/fluent/compat/output.rb', line 524

def initialize
  super
  unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
    self.class.prepend Fluent::Compat::CallSuperMixin
  end
end

Class Method Details

.propagate_default_paramsObject



467
468
469
# File 'lib/fluent/compat/output.rb', line 467

def self.propagate_default_params
  BUFFER_PARAMS
end

Instance Method Details

#configure(conf) ⇒ Object



472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
# File 'lib/fluent/compat/output.rb', line 472

def configure(conf)
  bufconf = CompatOutputUtils.buffer_section(conf)
  config_style = (bufconf ? :v1 : :v0)
  if config_style == :v0
    buf_params = {
      "flush_mode" => "interval",
      "retry_type" => "exponential_backoff",
    }
    BUFFER_PARAMS.each do |older, newer|
      next unless newer
      if conf.has_key?(older)
        if older == 'buffer_queue_full_action' && conf[older] == 'exception'
          buf_params[newer] = 'throw_exception'
        else
          buf_params[newer] = conf[older]
        end
      end
    end

    conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, [])
  end

  ParserUtils.convert_parser_conf(conf)
  FormatterUtils.convert_formatter_conf(conf)

  super

  if config_style == :v1
    if @buffer_config.chunk_keys == ['tag']
      raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only"
    end
  end

  self.extend BufferedChunkMixin
end

#detach_multi_process(&block) ⇒ Object



549
550
551
552
# File 'lib/fluent/compat/output.rb', line 549

def detach_multi_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end

#detach_process(&block) ⇒ Object



544
545
546
547
# File 'lib/fluent/compat/output.rb', line 544

def detach_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end

#extract_placeholders(str, metadata) ⇒ Object



520
521
522
# File 'lib/fluent/compat/output.rb', line 520

def extract_placeholders(str, )
  raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end

#format_stream(tag, es) ⇒ Object

for BufferedOutputTestDriver



508
509
510
511
512
513
514
# File 'lib/fluent/compat/output.rb', line 508

def format_stream(tag, es) # for BufferedOutputTestDriver
  if @compress == :gzip
    es.to_compressed_msgpack_stream(time_int: @time_as_integer)
  else
    es.to_msgpack_stream(time_int: @time_as_integer)
  end
end

#startObject



531
532
533
534
535
536
537
538
539
540
541
542
# File 'lib/fluent/compat/output.rb', line 531

def start
  super

  if instance_variable_defined?(:@formatter) && @inject_config
    unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
      if @formatter.respond_to?(:owner) && !@formatter.owner
        @formatter.owner = self
        @formatter.singleton_class.prepend FormatterUtils::InjectMixin
      end
    end
  end
end

#support_in_v12_style?(feature) ⇒ Boolean

This plugin cannot inherit BufferedOutput because #configure sets chunk_key ‘tag’ to flush chunks per tags, but BufferedOutput#configure doesn’t allow setting chunk_key in v1 style configuration

Returns:

  • (Boolean)


427
428
429
430
431
432
433
434
# File 'lib/fluent/compat/output.rb', line 427

def support_in_v12_style?(feature)
  case feature
  when :synchronous    then false
  when :buffered       then true
  when :delayed_commit then false
  when :custom_format  then false
  end
end

#write(chunk) ⇒ Object



516
517
518
# File 'lib/fluent/compat/output.rb', line 516

def write(chunk)
  write_objects(chunk..tag, chunk)
end