Class: Fluent::Compat::ObjectBufferedOutput
- Inherits:
-
Plugin::Output
- Object
- Plugin::Base
- Plugin::Output
- Fluent::Compat::ObjectBufferedOutput
- Includes:
- PropagateDefault
- Defined in:
- lib/fluent/compat/output.rb
Constant Summary collapse
- BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS
Constants inherited from Plugin::Output
Plugin::Output::BUFFER_STATS_KEYS, Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_ID_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Plugin::Output::UNRECOVERABLE_ERRORS
Constants included from Fluent::Configurable
Fluent::Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Plugin::Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #emit_count, #emit_records, #num_errors, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #rollback_count, #secondary, #timekey_zone, #write_count
Attributes included from PluginLoggerMixin
Attributes inherited from Plugin::Base
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #detach_multi_process(&block) ⇒ Object
- #detach_process(&block) ⇒ Object
- #extract_placeholders(str, metadata) ⇒ Object
-
#format_stream(tag, es) ⇒ Object
for BufferedOutputTestDriver.
-
#initialize ⇒ ObjectBufferedOutput
constructor
A new instance of ObjectBufferedOutput.
- #start ⇒ Object
-
#support_in_v12_style?(feature) ⇒ Boolean
This plugin cannot inherit BufferedOutput because #configure sets chunk_key ‘tag’ to flush chunks per tags, but BufferedOutput#configure doesn’t allow setting chunk_key in v1 style configuration.
- #write(chunk) ⇒ Object
Methods included from PropagateDefault
Methods inherited from Plugin::Output
#acts_as_secondary, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_events, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #metadata, #multi_workers_ready?, #next_flush_time, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_write, #shutdown, #statistics, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from PluginHelper::Mixin
Methods included from PluginLoggerMixin
Methods included from PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Plugin::Base
#after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Fluent::Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ ObjectBufferedOutput
Returns a new instance of ObjectBufferedOutput.
521 522 523 524 525 526 |
# File 'lib/fluent/compat/output.rb', line 521 def initialize super unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end |
Class Method Details
.propagate_default_params ⇒ Object
464 465 466 |
# File 'lib/fluent/compat/output.rb', line 464 def self.propagate_default_params BUFFER_PARAMS end |
Instance Method Details
#configure(conf) ⇒ Object
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 |
# File 'lib/fluent/compat/output.rb', line 469 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => "interval", "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, []) end ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 if @buffer_config.chunk_keys == ['tag'] raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only" end end self.extend BufferedChunkMixin end |
#detach_multi_process(&block) ⇒ Object
546 547 548 549 |
# File 'lib/fluent/compat/output.rb', line 546 def detach_multi_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#detach_process(&block) ⇒ Object
541 542 543 544 |
# File 'lib/fluent/compat/output.rb', line 541 def detach_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#extract_placeholders(str, metadata) ⇒ Object
517 518 519 |
# File 'lib/fluent/compat/output.rb', line 517 def extract_placeholders(str, ) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end |
#format_stream(tag, es) ⇒ Object
for BufferedOutputTestDriver
505 506 507 508 509 510 511 |
# File 'lib/fluent/compat/output.rb', line 505 def format_stream(tag, es) # for BufferedOutputTestDriver if @compress == :gzip es.to_compressed_msgpack_stream(time_int: @time_as_integer) else es.to_msgpack_stream(time_int: @time_as_integer) end end |
#start ⇒ Object
528 529 530 531 532 533 534 535 536 537 538 539 |
# File 'lib/fluent/compat/output.rb', line 528 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end |
#support_in_v12_style?(feature) ⇒ Boolean
This plugin cannot inherit BufferedOutput because #configure sets chunk_key ‘tag’ to flush chunks per tags, but BufferedOutput#configure doesn’t allow setting chunk_key in v1 style configuration
424 425 426 427 428 429 430 431 |
# File 'lib/fluent/compat/output.rb', line 424 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then false end end |
#write(chunk) ⇒ Object
513 514 515 |
# File 'lib/fluent/compat/output.rb', line 513 def write(chunk) write_objects(chunk..tag, chunk) end |