Class: Fluent::Compat::BufferedOutput
- Inherits:
-
Plugin::Output
- Object
- Plugin::Base
- Plugin::Output
- Fluent::Compat::BufferedOutput
- Includes:
- PropagateDefault
- Defined in:
- lib/fluent/compat/output.rb
Constant Summary collapse
- BUFFER_PARAMS =
Fluent::PluginHelper::CompatParameters::BUFFER_PARAMS
Constants inherited from Plugin::Output
Plugin::Output::BUFFER_STATS_KEYS, Plugin::Output::CHUNKING_FIELD_WARN_NUM, Plugin::Output::CHUNK_ID_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_KEY_PATTERN, Plugin::Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Plugin::Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Plugin::Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Plugin::Output::FORMAT_MSGPACK_STREAM, Plugin::Output::FORMAT_MSGPACK_STREAM_TIME_INT, Plugin::Output::TIMESTAMP_CHECK_BASE_TIME, Plugin::Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Plugin::Output::UNRECOVERABLE_ERRORS
Constants included from Fluent::Configurable
Fluent::Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary
Attributes inherited from Plugin::Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #secondary, #timekey_zone
Attributes included from PluginLoggerMixin
Attributes inherited from Plugin::Base
Class Method Summary collapse
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #detach_multi_process(&block) ⇒ Object
- #detach_process(&block) ⇒ Object
-
#emit(tag, es, chain, key = "") ⇒ Object
original implementation of v0.12 BufferedOutput.
- #extract_placeholders(str, metadata) ⇒ Object
- #format_stream(tag, es) ⇒ Object
-
#handle_stream_simple(tag, es, enqueue: false) ⇒ Object
This method overrides Fluent::Plugin::Output#handle_stream_simple because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn’t consider about it.
-
#initialize ⇒ BufferedOutput
constructor
A new instance of BufferedOutput.
- #start ⇒ Object
- #submit_flush ⇒ Object
- #support_in_v12_style?(feature) ⇒ Boolean
Methods included from PropagateDefault
Methods inherited from Plugin::Output
#acts_as_secondary, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_count, #emit_events, #emit_records, #emit_size, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #flush_thread_run, #flush_thread_wakeup, #force_flush, #format, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_limit_reached, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #interrupt_flushes, #log_retry_error, #metadata, #multi_workers_ready?, #next_flush_time, #num_errors, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_count, #rollback_write, #shutdown, #statistics, #stop, #submit_flush_all, #submit_flush_once, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #try_write, #update_retry_state, #write, #write_count, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from PluginHelper::Mixin
Methods included from PluginLoggerMixin
Methods included from PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Plugin::Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Fluent::Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ BufferedOutput
Returns a new instance of BufferedOutput.
387 388 389 390 391 392 |
# File 'lib/fluent/compat/output.rb', line 387 def initialize super unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end |
Class Method Details
.propagate_default_params ⇒ Object
248 249 250 |
# File 'lib/fluent/compat/output.rb', line 248 def self.propagate_default_params BUFFER_PARAMS end |
Instance Method Details
#configure(conf) ⇒ Object
253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/fluent/compat/output.rb', line 253 def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => "interval", "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end conf.elements << Fluent::Config::Element.new('buffer', '', buf_params, []) end @includes_record_filter = self.class.ancestors.include?(Fluent::Compat::RecordFilterMixin) methods_of_plugin = self.class.instance_methods(false) @overrides_emit = methods_of_plugin.include?(:emit) # RecordFilter mixin uses its own #format_stream method implementation @overrides_format_stream = methods_of_plugin.include?(:format_stream) || @includes_record_filter ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 unless @buffer_config.chunk_keys.empty? raise Fluent::ConfigError, "this plugin '#{self.class}' cannot handle arguments for <buffer ...> section" end end self.extend BufferedChunkMixin if @overrides_emit self.singleton_class.module_eval do attr_accessor :last_emit_via_buffer end output_plugin = self m = Module.new do define_method(:emit) do |key, data, chain| # receivers of this method are buffer instances output_plugin.last_emit_via_buffer = [key, data] end end @buffer.extend m end end |
#detach_multi_process(&block) ⇒ Object
412 413 414 415 |
# File 'lib/fluent/compat/output.rb', line 412 def detach_multi_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#detach_process(&block) ⇒ Object
407 408 409 410 |
# File 'lib/fluent/compat/output.rb', line 407 def detach_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end |
#emit(tag, es, chain, key = "") ⇒ Object
original implementation of v0.12 BufferedOutput
311 312 313 314 315 316 317 318 |
# File 'lib/fluent/compat/output.rb', line 311 def emit(tag, es, chain, key="") # this method will not be used except for the case that plugin calls super @emit_count_metrics.inc data = format_stream(tag, es) if @buffer.emit(key, data, chain) submit_flush end end |
#extract_placeholders(str, metadata) ⇒ Object
383 384 385 |
# File 'lib/fluent/compat/output.rb', line 383 def extract_placeholders(str, ) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end |
#format_stream(tag, es) ⇒ Object
324 325 326 327 328 329 330 331 |
# File 'lib/fluent/compat/output.rb', line 324 def format_stream(tag, es) # this method will not be used except for the case that plugin calls super out = '' es.each do |time, record| out << format(tag, time, record) end out end |
#handle_stream_simple(tag, es, enqueue: false) ⇒ Object
This method overrides Fluent::Plugin::Output#handle_stream_simple because v0.12 BufferedOutput may overrides #format_stream, but original #handle_stream_simple method doesn’t consider about it
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/fluent/compat/output.rb', line 338 def handle_stream_simple(tag, es, enqueue: false) if @overrides_emit current_emit_count = @emit_count_metrics.get size = es.size key = data = nil begin emit(tag, es, NULL_OUTPUT_CHAIN) key, data = self.last_emit_via_buffer ensure @emit_count_metrics.set(current_emit_count) self.last_emit_via_buffer = nil end # on-the-fly key assignment can be done, and it's not configurable if Plugin#emit does it dynamically = @buffer.(variables: (key && !key.empty? ? {key: key} : nil)) write_guard do @buffer.write({ => data}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics return [] end if @overrides_format_stream = (nil, nil, nil) size = es.size bulk = format_stream(tag, es) write_guard do @buffer.write({ => bulk}, format: ->(_data){ _data }, size: ->(){ size }, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics return [] end = (nil, nil, nil) size = es.size data = es.map{|time,record| format(tag, time, record) } write_guard do @buffer.write({ => data}, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics [] end |
#start ⇒ Object
394 395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/fluent/compat/output.rb', line 394 def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end |
#submit_flush ⇒ Object
320 321 322 |
# File 'lib/fluent/compat/output.rb', line 320 def submit_flush # nothing todo: blank method to be called from #emit of 3rd party plugins end |
#support_in_v12_style?(feature) ⇒ Boolean
210 211 212 213 214 215 216 217 |
# File 'lib/fluent/compat/output.rb', line 210 def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then true end end |