Class: Fluent::Compat::ObjectBufferedOutput

Inherits:
Plugin::Output show all
Includes:
PropagateDefault
Defined in:
lib/fluent/compat/output.rb

Constant Summary collapse

BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS

Constants inherited from Plugin::Output

Plugin::Output::BUFFER_STATS_KEYS, Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_ID_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Plugin::Output::UNRECOVERABLE_ERRORS

Constants included from Fluent::Configurable

Fluent::Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary

Attributes inherited from Plugin::Output

#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count

Attributes included from PluginLoggerMixin

#log

Attributes inherited from Plugin::Base

#under_plugin_development

Class Method Summary collapse

Instance Method Summary collapse

Methods included from PropagateDefault

included

Methods inherited from Plugin::Output

#acts_as_secondary, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #multi_workers_ready?, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #statistics, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from PluginHelper::Mixin

included

Methods included from PluginLoggerMixin

included, #terminate

Methods included from PluginId

#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Plugin::Base

#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Fluent::Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeObjectBufferedOutput

Returns a new instance of ObjectBufferedOutput.



521
522
523
524
525
526
# File 'lib/fluent/compat/output.rb', line 521

def initialize
  super
  unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin)
    self.class.prepend Fluent::Compat::CallSuperMixin
  end
end

Class Method Details

.propagate_default_paramsObject



464
465
466
# File 'lib/fluent/compat/output.rb', line 464

def self.propagate_default_params
  BUFFER_PARAMS
end

Instance Method Details

#configure(conf) ⇒ Object



469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
# File 'lib/fluent/compat/output.rb', line 469

def configure(conf)
  bufconf = CompatOutputUtils.buffer_section(conf)
  config_style = (bufconf ? :v1 : :v0)
  if config_style == :v0
    buf_params = {
      "flush_mode" => "interval",
      "retry_type" => "exponential_backoff",
    }
    BUFFER_PARAMS.each do |older, newer|
      next unless newer
      if conf.has_key?(older)
        if older == 'buffer_queue_full_action' && conf[older] == 'exception'
          buf_params[newer] = 'throw_exception'
        else
          buf_params[newer] = conf[older]
        end
      end
    end

    conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, [])
  end

  ParserUtils.convert_parser_conf(conf)
  FormatterUtils.convert_formatter_conf(conf)

  super

  if config_style == :v1
    if @buffer_config.chunk_keys == ['tag']
      raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only"
    end
  end

  self.extend BufferedChunkMixin
end

#detach_multi_process(&block) ⇒ Object



546
547
548
549
# File 'lib/fluent/compat/output.rb', line 546

def detach_multi_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end

#detach_process(&block) ⇒ Object



541
542
543
544
# File 'lib/fluent/compat/output.rb', line 541

def detach_process(&block)
  log.warn "detach_process is not supported in this version. ignored."
  block.call
end

#extract_placeholders(str, metadata) ⇒ Object



517
518
519
# File 'lib/fluent/compat/output.rb', line 517

def extract_placeholders(str, )
  raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API"
end

#format_stream(tag, es) ⇒ Object

for BufferedOutputTestDriver



505
506
507
508
509
510
511
# File 'lib/fluent/compat/output.rb', line 505

def format_stream(tag, es) # for BufferedOutputTestDriver
  if @compress == :gzip
    es.to_compressed_msgpack_stream(time_int: @time_as_integer)
  else
    es.to_msgpack_stream(time_int: @time_as_integer)
  end
end

#startObject



528
529
530
531
532
533
534
535
536
537
538
539
# File 'lib/fluent/compat/output.rb', line 528

def start
  super

  if instance_variable_defined?(:@formatter) && @inject_config
    unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin)
      if @formatter.respond_to?(:owner) && !@formatter.owner
        @formatter.owner = self
        @formatter.singleton_class.prepend FormatterUtils::InjectMixin
      end
    end
  end
end

#support_in_v12_style?(feature) ⇒ Boolean

This plugin cannot inherit BufferedOutput because #configure sets chunk_key ‘tag’ to flush chunks per tags, but BufferedOutput#configure doesn’t allow setting chunk_key in v1 style configuration

Returns:

  • (Boolean)


424
425
426
427
428
429
430
431
# File 'lib/fluent/compat/output.rb', line 424

def support_in_v12_style?(feature)
  case feature
  when :synchronous    then false
  when :buffered       then true
  when :delayed_commit then false
  when :custom_format  then false
  end
end

#write(chunk) ⇒ Object



513
514
515
# File 'lib/fluent/compat/output.rb', line 513

def write(chunk)
  write_objects(chunk..tag, chunk)
end