Class: Fluent::Plugin::Output
- Defined in:
- lib/fluent/plugin/output.rb
Direct Known Subclasses
Compat::BufferedOutput, Compat::ObjectBufferedOutput, Compat::Output, Compat::TimeSlicedOutput, ExecFilterOutput, ExecOutput, FileOutput, ForwardOutput, HTTPOutput, NullOutput, RelabelOutput, SecondaryFileOutput, StdoutOutput
Defined Under Namespace
Classes: DequeuedChunkInfo, FlushThreadState, PlaceholderValidator
Constant Summary collapse
- CHUNK_KEY_PATTERN =
/^[-_.@a-zA-Z0-9]+$/
- CHUNK_KEY_PLACEHOLDER_PATTERN =
/\$\{([-_.@$a-zA-Z0-9]+)\}/
- CHUNK_TAG_PLACEHOLDER_PATTERN =
/\$\{(tag(?:\[-?\d+\])?)\}/
- CHUNK_ID_PLACEHOLDER_PATTERN =
/\$\{chunk_id\}/
- CHUNKING_FIELD_WARN_NUM =
4
- TIME_KEY_PLACEHOLDER_THRESHOLDS =
[ [1, :second, '%S'], [60, :minute, '%M'], [3600, :hour, '%H'], [86400, :day, '%d'], ]
- TIMESTAMP_CHECK_BASE_TIME =
Time.parse("2016-01-01 00:00:00 UTC")
- FORMAT_MSGPACK_STREAM =
->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
- FORMAT_COMPRESSED_MSGPACK_STREAM =
->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
- FORMAT_MSGPACK_STREAM_TIME_INT =
->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
- FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT =
->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
- UNRECOVERABLE_ERRORS =
[Fluent::UnrecoverableError, TypeError, ArgumentError, NoMethodError, MessagePack::UnpackError, EncodingError]
- BUFFER_STATS_KEYS =
{}
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#as_secondary ⇒ Object
readonly
Returns the value of attribute as_secondary.
-
#buffer ⇒ Object
readonly
for tests.
-
#chunk_key_accessors ⇒ Object
readonly
for tests.
-
#chunk_key_tag ⇒ Object
readonly
for tests.
-
#chunk_key_time ⇒ Object
readonly
for tests.
-
#chunk_keys ⇒ Object
readonly
for tests.
-
#delayed_commit ⇒ Object
readonly
Returns the value of attribute delayed_commit.
-
#delayed_commit_timeout ⇒ Object
readonly
Returns the value of attribute delayed_commit_timeout.
-
#dequeued_chunks ⇒ Object
Returns the value of attribute dequeued_chunks.
-
#dequeued_chunks_mutex ⇒ Object
Returns the value of attribute dequeued_chunks_mutex.
-
#output_enqueue_thread_waiting ⇒ Object
Returns the value of attribute output_enqueue_thread_waiting.
-
#retry ⇒ Object
readonly
for tests.
-
#retry_for_error_chunk ⇒ Object
output_enqueue_thread_waiting: for test of output.rb itself.
-
#secondary ⇒ Object
readonly
for tests.
-
#timekey_zone ⇒ Object
readonly
Returns the value of attribute timekey_zone.
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #acts_as_secondary(primary) ⇒ Object
- #actual_flush_thread_count ⇒ Object
- #after_shutdown ⇒ Object
- #after_start ⇒ Object
- #backup_chunk(chunk, using_secondary, delayed_commit) ⇒ Object
- #before_shutdown ⇒ Object
- #calculate_timekey(time) ⇒ Object
- #check_slow_flush(start) ⇒ Object
- #chunk_for_test(tag, time, record) ⇒ Object
- #close ⇒ Object
- #commit_write(chunk_id, delayed: @delayed_commit, secondary: false) ⇒ Object
- #configure(conf) ⇒ Object
- #emit_buffered(tag, es) ⇒ Object
- #emit_count ⇒ Object
- #emit_events(tag, es) ⇒ Object
- #emit_records ⇒ Object
- #emit_size ⇒ Object
- #emit_sync(tag, es) ⇒ Object
- #enqueue_thread_run ⇒ Object
-
#enqueue_thread_wait ⇒ Object
only for tests of output plugin.
- #execute_chunking(tag, es, enqueue: false) ⇒ Object
-
#extract_placeholders(str, chunk) ⇒ Object
TODO: optimize this code.
- #flush_thread_run(state) ⇒ Object
-
#flush_thread_wakeup ⇒ Object
only for tests of output plugin.
- #force_flush ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#formatted_to_msgpack_binary ⇒ Object
Compatibility for existing plugins.
- #formatted_to_msgpack_binary? ⇒ Boolean
- #generate_format_proc ⇒ Object
- #get_placeholders_keys(str) ⇒ Object
-
#get_placeholders_tag(str) ⇒ Object
-1 means whole tag.
-
#get_placeholders_time(str) ⇒ Object
it’s not validated to use timekey larger than 1 day.
- #handle_limit_reached(error) ⇒ Object
- #handle_stream_simple(tag, es, enqueue: false) ⇒ Object
-
#handle_stream_with_custom_format(tag, es, enqueue: false) ⇒ Object
metadata_and_data is a Hash of: (standard format) metadata => event stream (custom format) metadata => array of formatted event For standard format, formatting should be done for whole event stream, but “whole event stream” may be a split of “es” here when it’s bigger than chunk_limit_size.
- #handle_stream_with_standard_format(tag, es, enqueue: false) ⇒ Object
- #implement?(feature) ⇒ Boolean
-
#initialize ⇒ Output
constructor
A new instance of Output.
-
#interrupt_flushes ⇒ Object
only for tests of output plugin.
- #keep_buffer_config_compat ⇒ Object
- #log_retry_error(error, chunk_id_hex, using_secondary) ⇒ Object
-
#metadata(tag, time, record) ⇒ Object
TODO: optimize this code.
- #multi_workers_ready? ⇒ Boolean
- #next_flush_time ⇒ Object
- #num_errors ⇒ Object
- #placeholder_validate!(name, str) ⇒ Object
- #placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) ⇒ Object
- #prefer_buffered_processing ⇒ Object
- #prefer_delayed_commit ⇒ Object
- #process(tag, es) ⇒ Object
- #retry_state(randomize) ⇒ Object
- #rollback_count ⇒ Object
-
#rollback_write(chunk_id, update_retry: true) ⇒ Object
update_retry parameter is for preventing busy loop by async write We will remove this parameter by re-design retry_state management between threads.
- #shutdown ⇒ Object
- #start ⇒ Object
- #statistics ⇒ Object
- #stop ⇒ Object
- #submit_flush_all ⇒ Object
- #submit_flush_once ⇒ Object
- #support_in_v12_style?(feature) ⇒ Boolean
- #synchronize_in_threads ⇒ Object
-
#synchronize_path(path) ⇒ Object
Ensures ‘path` (filename or filepath) processable only by the current thread in the current process.
- #synchronize_path_in_workers(path) ⇒ Object
- #terminate ⇒ Object
- #try_flush ⇒ Object
- #try_rollback_all ⇒ Object
- #try_rollback_write ⇒ Object
- #try_write(chunk) ⇒ Object
- #update_retry_state(chunk_id, using_secondary, error = nil) ⇒ Object
- #write(chunk) ⇒ Object
- #write_count ⇒ Object
- #write_guard(&block) ⇒ Object
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
Methods included from Fluent::PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir
Methods inherited from Base
#acquire_worker_lock, #after_shutdown?, #after_started?, #before_shutdown?, #called_in_test?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown?, #started?, #stopped?, #string_safe_encoding, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ Output
Returns a new instance of Output.
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/fluent/plugin/output.rb', line 198 def initialize super @counter_mutex = Mutex.new @flush_thread_mutex = Mutex.new @buffering = false @delayed_commit = false @as_secondary = false @primary_instance = nil # TODO: well organized counters @num_errors_metrics = nil @emit_count_metrics = nil @emit_records_metrics = nil @emit_size_metrics = nil @write_count_metrics = nil @rollback_count_metrics = nil @flush_time_count_metrics = nil @slow_flush_count_metrics = nil @enable_size_metrics = false # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start if implement?(:synchronous) if implement?(:buffered) || implement?(:delayed_commit) @buffering = nil # do #configure or #start to determine this for full-featured plugins else @buffering = false end else @buffering = true end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = false # decided later @buffer = nil @secondary = nil @retry = nil @dequeued_chunks = nil @dequeued_chunks_mutex = nil @output_enqueue_thread = nil @output_flush_threads = nil @output_flush_thread_current_position = 0 @simple_chunking = nil @chunk_keys = @chunk_key_accessors = @chunk_key_time = @chunk_key_tag = nil @flush_mode = nil @timekey_zone = nil @retry_for_error_chunk = false end |
Instance Attribute Details
#as_secondary ⇒ Object (readonly)
Returns the value of attribute as_secondary.
166 167 168 |
# File 'lib/fluent/plugin/output.rb', line 166 def as_secondary @as_secondary end |
#buffer ⇒ Object (readonly)
for tests
169 170 171 |
# File 'lib/fluent/plugin/output.rb', line 169 def buffer @buffer end |
#chunk_key_accessors ⇒ Object (readonly)
for tests
169 170 171 |
# File 'lib/fluent/plugin/output.rb', line 169 def chunk_key_accessors @chunk_key_accessors end |
#chunk_key_tag ⇒ Object (readonly)
for tests
169 170 171 |
# File 'lib/fluent/plugin/output.rb', line 169 def chunk_key_tag @chunk_key_tag end |
#chunk_key_time ⇒ Object (readonly)
for tests
169 170 171 |
# File 'lib/fluent/plugin/output.rb', line 169 def chunk_key_time @chunk_key_time end |
#chunk_keys ⇒ Object (readonly)
for tests
169 170 171 |
# File 'lib/fluent/plugin/output.rb', line 169 def chunk_keys @chunk_keys end |
#delayed_commit ⇒ Object (readonly)
Returns the value of attribute delayed_commit.
166 167 168 |
# File 'lib/fluent/plugin/output.rb', line 166 def delayed_commit @delayed_commit end |
#delayed_commit_timeout ⇒ Object (readonly)
Returns the value of attribute delayed_commit_timeout.
166 167 168 |
# File 'lib/fluent/plugin/output.rb', line 166 def delayed_commit_timeout @delayed_commit_timeout end |
#dequeued_chunks ⇒ Object
Returns the value of attribute dequeued_chunks.
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def dequeued_chunks @dequeued_chunks end |
#dequeued_chunks_mutex ⇒ Object
Returns the value of attribute dequeued_chunks_mutex.
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def dequeued_chunks_mutex @dequeued_chunks_mutex end |
#output_enqueue_thread_waiting ⇒ Object
Returns the value of attribute output_enqueue_thread_waiting.
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def output_enqueue_thread_waiting @output_enqueue_thread_waiting end |
#retry ⇒ Object (readonly)
for tests
169 170 171 |
# File 'lib/fluent/plugin/output.rb', line 169 def retry @retry end |
#retry_for_error_chunk ⇒ Object
output_enqueue_thread_waiting: for test of output.rb itself
172 173 174 |
# File 'lib/fluent/plugin/output.rb', line 172 def retry_for_error_chunk @retry_for_error_chunk end |
#secondary ⇒ Object (readonly)
for tests
169 170 171 |
# File 'lib/fluent/plugin/output.rb', line 169 def secondary @secondary end |
#timekey_zone ⇒ Object (readonly)
Returns the value of attribute timekey_zone.
166 167 168 |
# File 'lib/fluent/plugin/output.rb', line 166 def timekey_zone @timekey_zone end |
Instance Method Details
#acts_as_secondary(primary) ⇒ Object
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/fluent/plugin/output.rb', line 248 def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary @chunk_keys = @primary_instance.chunk_keys || [] @chunk_key_tag = @primary_instance.chunk_key_tag || false if @primary_instance.chunk_key_time @chunk_key_time = @primary_instance.chunk_key_time @timekey_zone = @primary_instance.timekey_zone @output_time_formatter_cache = {} end self.context_router = primary.context_router singleton_class.module_eval do define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) } end end |
#actual_flush_thread_count ⇒ Object
603 604 605 606 607 |
# File 'lib/fluent/plugin/output.rb', line 603 def actual_flush_thread_count return 0 unless @buffering return @buffer_config.flush_thread_count unless @as_secondary @primary_instance.buffer_config.flush_thread_count end |
#after_shutdown ⇒ Object
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 |
# File 'lib/fluent/plugin/output.rb', line 563 def after_shutdown try_rollback_all if @buffering && !@as_secondary # rollback regardless with @delayed_commit, because secondary may do it @secondary.after_shutdown if @secondary if @buffering && @buffer @buffer.after_shutdown @output_flush_threads_running = false if @output_flush_threads && !@output_flush_threads.empty? @output_flush_threads.each do |state| # to wakeup thread and make it to stop by itself state.mutex.synchronize { if state.thread && state.thread.status state.next_clock = 0 state.cond_var.signal end } Thread.pass state.thread.join end end end super end |
#after_start ⇒ Object
525 526 527 528 |
# File 'lib/fluent/plugin/output.rb', line 525 def after_start super @secondary.after_start if @secondary end |
#backup_chunk(chunk, using_secondary, delayed_commit) ⇒ Object
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 |
# File 'lib/fluent/plugin/output.rb', line 1287 def backup_chunk(chunk, using_secondary, delayed_commit) if @buffer.disable_chunk_backup log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away" else @buffer.backup(chunk.unique_id) { |f| chunk.write_to(f) } end commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit) end |
#before_shutdown ⇒ Object
537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
# File 'lib/fluent/plugin/output.rb', line 537 def before_shutdown @secondary.before_shutdown if @secondary if @buffering && @buffer if @flush_at_shutdown force_flush end @buffer.before_shutdown # Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data @output_enqueue_thread_running = false if @output_enqueue_thread && @output_enqueue_thread.alive? @output_enqueue_thread.wakeup @output_enqueue_thread.join end end super end |
#calculate_timekey(time) ⇒ Object
948 949 950 951 952 953 954 955 956 |
# File 'lib/fluent/plugin/output.rb', line 948 def calculate_timekey(time) time_int = time.to_i if @timekey_use_utc (time_int - (time_int % @timekey)).to_i else offset = @calculate_offset ? @calculate_offset.call(time) : @offset (time_int - ((time_int + offset)% @timekey)).to_i end end |
#check_slow_flush(start) ⇒ Object
1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 |
# File 'lib/fluent/plugin/output.rb', line 1298 def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start elapsed_millsec = (elapsed_time * 1000).to_i @flush_time_count_metrics.add(elapsed_millsec) if elapsed_time > @slow_flush_log_threshold @slow_flush_count_metrics.inc log.warn "buffer flush took longer time than slow_flush_log_threshold:", elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id end end |
#chunk_for_test(tag, time, record) ⇒ Object
958 959 960 961 962 963 |
# File 'lib/fluent/plugin/output.rb', line 958 def chunk_for_test(tag, time, record) require 'fluent/plugin/buffer/memory_chunk' m = (tag, time, record) Fluent::Plugin::Buffer::MemoryChunk.new(m) end |
#close ⇒ Object
589 590 591 592 593 594 |
# File 'lib/fluent/plugin/output.rb', line 589 def close @buffer.close if @buffering && @buffer @secondary.close if @secondary super end |
#commit_write(chunk_id, delayed: @delayed_commit, secondary: false) ⇒ Object
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 |
# File 'lib/fluent/plugin/output.rb', line 1102 def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed } if delayed @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end end @buffer.purge_chunk(chunk_id) @retry_mutex.synchronize do if @retry # success to flush chunks in retries if secondary log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id) else log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id) end @retry = nil end end end |
#configure(conf) ⇒ Object
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 |
# File 'lib/fluent/plugin/output.rb', line 266 def configure(conf) unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit) raise "BUG: output plugin must implement some methods. see developer documents." end has_buffer_section = (conf.elements(name: 'buffer').size > 0) has_flush_interval = conf.has_key?('flush_interval') super @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_count", help_text: "Number of count emits") @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") @write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering" end @buffering = true else # no buffer sections if implement?(:synchronous) if !implement?(:buffered) && !implement?(:delayed_commit) if @as_secondary raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't." end @buffering = false else if @as_secondary # secondary plugin always works as buffered plugin without buffer instance @buffering = true else # @buffering.nil? shows that enabling buffering or not will be decided in lazy way in #start @buffering = nil end end else # buffered or delayed_commit is supported by `unless` of first line in this method @buffering = true end end # Enable to update record size metrics or not @enable_size_metrics = !!system_config.enable_size_metrics if @as_secondary if !@buffering && !@buffering.nil? raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't" end end if (@buffering || @buffering.nil?) && !@as_secondary # When @buffering.nil?, @buffer_config was initialized with default value for all parameters. # If so, this configuration MUST success. @chunk_keys = @buffer_config.chunk_keys.dup @chunk_key_time = !!@chunk_keys.delete('time') @chunk_key_tag = !!@chunk_keys.delete('tag') if @chunk_keys.any? { |key| begin k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key) if k.is_a?(String) k !~ CHUNK_KEY_PATTERN else if key.start_with?('$[') raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed" else false end end rescue => e raise Fluent::ConfigError, "in chunk_keys: #{e.}" end } raise Fluent::ConfigError, "chunk_keys specification includes invalid char" else @chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }] end if @chunk_key_time raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey Fluent::Timezone.validate!(@buffer_config.timekey_zone) @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone @timekey = @buffer_config.timekey if @timekey <= 0 raise Fluent::ConfigError, "timekey should be greater than 0. current timekey: #{@timekey}" end @timekey_use_utc = @buffer_config.timekey_use_utc @offset = Fluent::Timezone.utc_offset(@timekey_zone) @calculate_offset = @offset.respond_to?(:call) ? @offset : nil @output_time_formatter_cache = {} end if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM log.warn "many chunk keys specified, and it may cause too many chunks on your system." end # no chunk keys or only tags (chunking can be done without iterating event stream) @simple_chunking = !@chunk_key_time && @chunk_keys.empty? @flush_mode = @buffer_config.flush_mode if @flush_mode == :default if has_flush_interval log.info "'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour" @flush_mode = :interval else @flush_mode = (@chunk_key_time ? :lazy : :interval) end end buffer_type = @buffer_config[:@type] buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, []) @buffer = Plugin.new_buffer(buffer_type, parent: self) @buffer.configure(buffer_conf) keep_buffer_config_compat @buffer.enable_update_timekeys if @chunk_key_time @flush_at_shutdown = @buffer_config.flush_at_shutdown if @flush_at_shutdown.nil? @flush_at_shutdown = if @buffer.persistent? false else true # flush_at_shutdown is true in default for on-memory buffer end elsif !@flush_at_shutdown && !@buffer.persistent? buf_type = Plugin.lookup_type_from_class(@buffer.class) log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer." log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again." end if (@flush_mode != :interval) && buffer_conf.has_key?('flush_interval') if buffer_conf.has_key?('flush_mode') raise Fluent::ConfigError, "'flush_interval' can't be specified when 'flush_mode' is not 'interval' explicitly: '#{@flush_mode}'" else log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'" end end if @buffer.queued_chunks_limit_size.nil? @buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count end end if @secondary_config raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary if @buffer_config.retry_forever log.warn "<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary" end secondary_type = @secondary_config[:@type] unless secondary_type secondary_type = conf['@type'] # primary plugin type end secondary_conf = conf.elements(name: 'secondary').first @secondary = Plugin.new_output(secondary_type) unless @secondary.respond_to?(:acts_as_secondary) raise Fluent::ConfigError, "Failed to setup secondary plugin in '#{conf['@type']}'. '#{secondary_type}' plugin in not allowed due to non buffered output" end @secondary.acts_as_secondary(self) @secondary.configure(secondary_conf) if (@secondary.class.to_s != "Fluent::Plugin::SecondaryFileOutput") && (self.class != @secondary.class) && (@custom_format || @secondary.implement?(:custom_format)) log.warn "Use different plugin for secondary. Check the plugin works with primary like secondary_file", primary: self.class.to_s, secondary: @secondary.class.to_s end else @secondary = nil end self end |
#emit_buffered(tag, es) ⇒ Object
894 895 896 897 898 899 900 901 902 903 904 905 906 |
# File 'lib/fluent/plugin/output.rb', line 894 def emit_buffered(tag, es) @emit_count_metrics.inc begin execute_chunking(tag, es, enqueue: (@flush_mode == :immediate)) if !@retry && @buffer.queued?(nil, optimistic: true) submit_flush_once end rescue # TODO: separate number of errors into emit errors and write/flush errors @num_errors_metrics.inc raise end end |
#emit_count ⇒ Object
178 179 180 |
# File 'lib/fluent/plugin/output.rb', line 178 def emit_count @emit_count_metrics.get end |
#emit_events(tag, es) ⇒ Object
873 874 875 876 877 878 879 880 |
# File 'lib/fluent/plugin/output.rb', line 873 def emit_events(tag, es) # actually this method will be overwritten by #configure if @buffering emit_buffered(tag, es) else emit_sync(tag, es) end end |
#emit_records ⇒ Object
186 187 188 |
# File 'lib/fluent/plugin/output.rb', line 186 def emit_records @emit_records_metrics.get end |
#emit_size ⇒ Object
182 183 184 |
# File 'lib/fluent/plugin/output.rb', line 182 def emit_size @emit_size_metrics.get end |
#emit_sync(tag, es) ⇒ Object
882 883 884 885 886 887 888 889 890 891 892 |
# File 'lib/fluent/plugin/output.rb', line 882 def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue @num_errors_metrics.inc raise end end |
#enqueue_thread_run ⇒ Object
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 |
# File 'lib/fluent/plugin/output.rb', line 1445 def enqueue_thread_run value_for_interval = nil if @flush_mode == :interval value_for_interval = @buffer_config.flush_interval end if @chunk_key_time if !value_for_interval || @buffer_config.timekey < value_for_interval value_for_interval = [@buffer_config.timekey, @buffer_config.timekey_wait].min end end unless value_for_interval raise "BUG: both of flush_interval and timekey are disabled" end interval = value_for_interval / 11.0 if interval < @buffer_config.flush_thread_interval interval = @buffer_config.flush_thread_interval end while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "enqueue_thread actually running" begin while @output_enqueue_thread_running now_int = Time.now.to_i if @output_flush_interrupted sleep interval next end @output_enqueue_thread_mutex.lock begin if @flush_mode == :interval flush_interval = @buffer_config.flush_interval.to_i # This block should be done by integer values. # If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s. # If we use integered values for this comparison, expected actual flush timing is 1.0s. @buffer.enqueue_all{ |, chunk| chunk.raw_create_at + flush_interval <= now_int } end if @chunk_key_time timekey_unit = @buffer_config.timekey timekey_wait = @buffer_config.timekey_wait current_timekey = now_int - now_int % timekey_unit @buffer.enqueue_all{ |, chunk| .timekey < current_timekey && .timekey + timekey_unit + timekey_wait <= now_int } end rescue => e raise if @under_plugin_development log.error "unexpected error while checking flushed chunks. ignored.", error: e log.error_backtrace ensure @output_enqueue_thread_waiting = false @output_enqueue_thread_mutex.unlock end sleep interval end rescue => e # normal errors are rescued by inner begin-rescue clause. log.error "error on enqueue thread", error: e log.error_backtrace raise end end |
#enqueue_thread_wait ⇒ Object
only for tests of output plugin
1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 |
# File 'lib/fluent/plugin/output.rb', line 1421 def enqueue_thread_wait @output_enqueue_thread_mutex.synchronize do @output_flush_interrupted = false @output_enqueue_thread_waiting = true end require 'timeout' Timeout.timeout(10) do Thread.pass while @output_enqueue_thread_waiting end end |
#execute_chunking(tag, es, enqueue: false) ⇒ Object
965 966 967 968 969 970 971 972 973 |
# File 'lib/fluent/plugin/output.rb', line 965 def execute_chunking(tag, es, enqueue: false) if @simple_chunking handle_stream_simple(tag, es, enqueue: enqueue) elsif @custom_format handle_stream_with_custom_format(tag, es, enqueue: enqueue) else handle_stream_with_standard_format(tag, es, enqueue: enqueue) end end |
#extract_placeholders(str, chunk) ⇒ Object
TODO: optimize this code
798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 |
# File 'lib/fluent/plugin/output.rb', line 798 def extract_placeholders(str, chunk) = if chunk.is_a?(Fluent::Plugin::Buffer::Chunk) chunk_passed = true chunk. else chunk_passed = false # For existing plugins. Old plugin passes Chunk.metadata instead of Chunk chunk end if .empty? str.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { if chunk_passed dump_unique_id_hex(chunk.unique_id) else log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" end } else rvalue = str.dup # strftime formatting if @chunk_key_time # this section MUST be earlier than rest to use raw 'str' @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str) rvalue = @output_time_formatter_cache[str].call(.timekey) end # ${tag}, ${tag[0]}, ${tag[1]}, ... , ${tag[-2]}, ${tag[-1]} if @chunk_key_tag if str.include?('${tag}') rvalue = rvalue.gsub('${tag}', .tag) end if CHUNK_TAG_PLACEHOLDER_PATTERN.match?(str) hash = {} tag_parts = .tag.split('.') tag_parts.each_with_index do |part, i| hash["${tag[#{i}]}"] = part hash["${tag[#{i-tag_parts.size}]}"] = part end rvalue = rvalue.gsub(CHUNK_TAG_PLACEHOLDER_PATTERN, hash) end if rvalue =~ CHUNK_TAG_PLACEHOLDER_PATTERN log.warn "tag placeholder '#{$1}' not replaced. tag:#{.tag}, template:#{str}" end end # First we replace ${chunk_id} with chunk.unique_id (hexlified). rvalue = rvalue.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { if chunk_passed dump_unique_id_hex(chunk.unique_id) else log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" end } # Then, replace other ${chunk_key}s. if !@chunk_keys.empty? && .variables hash = {'${tag}' => '${tag}'} # not to erase this wrongly @chunk_keys.each do |key| hash["${#{key}}"] = .variables[key.to_sym] end rvalue = rvalue.gsub(CHUNK_KEY_PLACEHOLDER_PATTERN) do |matched| hash.fetch(matched) do log.warn "chunk key placeholder '#{matched[2..-2]}' not replaced. template:#{str}" '' end end end if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN log.warn "chunk key placeholder '#{$1}' not replaced. template:#{str}" end rvalue end end |
#flush_thread_run(state) ⇒ Object
1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 |
# File 'lib/fluent/plugin/output.rb', line 1510 def flush_thread_run(state) flush_thread_interval = @buffer_config.flush_thread_interval state.next_clock = Fluent::Clock.now + flush_thread_interval while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "flush_thread actually running" state.mutex.lock begin # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase while @output_flush_threads_running current_clock = Fluent::Clock.now next_retry_time = nil @retry_mutex.synchronize do next_retry_time = @retry ? @retry.next_time : nil end if state.next_clock > current_clock interval = state.next_clock - current_clock elsif next_retry_time && next_retry_time > Time.now interval = next_retry_time.to_f - Time.now.to_f else state.mutex.unlock begin try_flush # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying) interval = next_flush_time.to_f - Time.now.to_f # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected # because @retry still exists (#commit_write is not called yet in #try_flush) # @retry should be cleared if delayed commit is enabled? Or any other solution? state.next_clock = Fluent::Clock.now + interval ensure state.mutex.lock end end if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? } unless @output_flush_interrupted state.mutex.unlock begin try_rollback_write ensure state.mutex.lock end end end state.cond_var.wait(state.mutex, interval) if interval > 0 end rescue => e # normal errors are rescued by output plugins in #try_flush # so this rescue section is for critical & unrecoverable errors log.error "error on output thread", error: e log.error_backtrace raise ensure state.mutex.unlock end end |
#flush_thread_wakeup ⇒ Object
only for tests of output plugin
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 |
# File 'lib/fluent/plugin/output.rb', line 1433 def flush_thread_wakeup @output_flush_threads.each do |state| state.mutex.synchronize { if state.thread && state.thread.status state.next_clock = 0 state.cond_var.signal end } Thread.pass end end |
#force_flush ⇒ Object
1401 1402 1403 1404 1405 1406 |
# File 'lib/fluent/plugin/output.rb', line 1401 def force_flush if @buffering @buffer.enqueue_all(true) submit_flush_all end end |
#format(tag, time, record) ⇒ Object
126 127 128 129 |
# File 'lib/fluent/plugin/output.rb', line 126 def format(tag, time, record) # standard msgpack_event_stream chunk will be used if this method is not implemented in plugin subclass raise NotImplementedError, "BUG: output plugins MUST implement this method" end |
#formatted_to_msgpack_binary ⇒ Object
Compatibility for existing plugins
138 139 140 |
# File 'lib/fluent/plugin/output.rb', line 138 def formatted_to_msgpack_binary formatted_to_msgpack_binary? end |
#formatted_to_msgpack_binary? ⇒ Boolean
131 132 133 134 135 |
# File 'lib/fluent/plugin/output.rb', line 131 def formatted_to_msgpack_binary? # To indicate custom format method (#format) returns msgpack binary or not. # If #format returns msgpack binary, override this method to return true. false end |
#generate_format_proc ⇒ Object
1021 1022 1023 1024 1025 1026 1027 |
# File 'lib/fluent/plugin/output.rb', line 1021 def generate_format_proc if @buffer && @buffer.compress == :gzip @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM else @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM end end |
#get_placeholders_keys(str) ⇒ Object
793 794 795 |
# File 'lib/fluent/plugin/output.rb', line 793 def get_placeholders_keys(str) str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map(&:first).reject{|s| (s == "tag") || (s == 'chunk_id') }.sort end |
#get_placeholders_tag(str) ⇒ Object
-1 means whole tag
780 781 782 783 784 785 786 787 788 789 790 791 |
# File 'lib/fluent/plugin/output.rb', line 780 def get_placeholders_tag(str) # [["tag"],["tag[0]"]] parts = [] str.scan(CHUNK_TAG_PLACEHOLDER_PATTERN).map(&:first).each do |ph| if ph == "tag" parts << -1 elsif ph =~ /^tag\[(-?\d+)\]$/ parts << $1.to_i end end parts.sort end |
#get_placeholders_time(str) ⇒ Object
it’s not validated to use timekey larger than 1 day
770 771 772 773 774 775 776 777 |
# File 'lib/fluent/plugin/output.rb', line 770 def get_placeholders_time(str) base_str = TIMESTAMP_CHECK_BASE_TIME.strftime(str) TIME_KEY_PLACEHOLDER_THRESHOLDS.each do |triple| sec = triple.first return triple if (TIMESTAMP_CHECK_BASE_TIME + sec).strftime(str) != base_str end nil end |
#handle_limit_reached(error) ⇒ Object
1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 |
# File 'lib/fluent/plugin/output.rb', line 1355 def handle_limit_reached(error) if error records = @buffer.queued_records msg = "Hit limit for retries. dropping all chunks in the buffer queue." log.error msg, retry_times: @retry.steps, records: records, error: error log.error_backtrace error.backtrace end @buffer.clear_queue! log.debug "buffer queue cleared" @retry = nil end |
#handle_stream_simple(tag, es, enqueue: false) ⇒ Object
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 |
# File 'lib/fluent/plugin/output.rb', line 1076 def handle_stream_simple(tag, es, enqueue: false) format_proc = nil = ((@chunk_key_tag ? tag : nil), nil, nil) records = es.size if @custom_format records = 0 data = [] es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| res = format(tag, time, record) if res data << res records += 1 end end else format_proc = generate_format_proc data = es end write_guard do @buffer.write({ => data}, format: format_proc, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end |
#handle_stream_with_custom_format(tag, es, enqueue: false) ⇒ Object
metadata_and_data is a Hash of:
(standard format) metadata => event stream
(custom format) metadata => array of formatted event
For standard format, formatting should be done for whole event stream, but
"whole event stream" may be a split of "es" here when it's bigger than chunk_limit_size.
`@buffer.write` will do this splitting.
For custom format, formatting will be done here. Custom formatting always requires
iteration of event stream, and it should be done just once even if total event stream size
is bigger than chunk_limit_size because of performance.
1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 |
# File 'lib/fluent/plugin/output.rb', line 1038 def handle_stream_with_custom_format(tag, es, enqueue: false) = {} records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| = (tag, time, record) [] ||= [] res = format(tag, time, record) if res [] << res records += 1 end end write_guard do @buffer.write(, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end |
#handle_stream_with_standard_format(tag, es, enqueue: false) ⇒ Object
1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 |
# File 'lib/fluent/plugin/output.rb', line 1058 def handle_stream_with_standard_format(tag, es, enqueue: false) format_proc = generate_format_proc = {} records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| = (tag, time, record) [] ||= MultiEventStream.new [].add(time, record) records += 1 end write_guard do @buffer.write(, format: format_proc, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end |
#implement?(feature) ⇒ Boolean
651 652 653 654 655 656 657 658 659 660 661 |
# File 'lib/fluent/plugin/output.rb', line 651 def implement?(feature) methods_of_plugin = self.class.instance_methods(false) case feature when :synchronous then methods_of_plugin.include?(:process) || support_in_v12_style?(:synchronous) when :buffered then methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered) when :delayed_commit then methods_of_plugin.include?(:try_write) when :custom_format then methods_of_plugin.include?(:format) || support_in_v12_style?(:custom_format) else raise ArgumentError, "Unknown feature for output plugin: #{feature}" end end |
#interrupt_flushes ⇒ Object
only for tests of output plugin
1416 1417 1418 |
# File 'lib/fluent/plugin/output.rb', line 1416 def interrupt_flushes @output_flush_interrupted = true end |
#keep_buffer_config_compat ⇒ Object
441 442 443 444 445 |
# File 'lib/fluent/plugin/output.rb', line 441 def keep_buffer_config_compat # Need this to call `@buffer_config.disable_chunk_backup` just as before, # since some plugins may use this option in this way. @buffer_config[:disable_chunk_backup] = @buffer.disable_chunk_backup end |
#log_retry_error(error, chunk_id_hex, using_secondary) ⇒ Object
1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 |
# File 'lib/fluent/plugin/output.rb', line 1344 def log_retry_error(error, chunk_id_hex, using_secondary) return unless error if using_secondary msg = "failed to flush the buffer with secondary output." else msg = "failed to flush the buffer." end log.warn(msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error) log.warn_backtrace(error.backtrace) end |
#metadata(tag, time, record) ⇒ Object
TODO: optimize this code
909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 |
# File 'lib/fluent/plugin/output.rb', line 909 def (tag, time, record) # this arguments are ordered in output plugin's rule # Metadata 's argument order is different from this one (timekey, tag, variables) raise ArgumentError, "tag must be a String: #{tag.class}" unless tag.nil? || tag.is_a?(String) raise ArgumentError, "time must be a Fluent::EventTime (or Integer): #{time.class}" unless time.nil? || time.is_a?(Fluent::EventTime) || time.is_a?(Integer) raise ArgumentError, "record must be a Hash: #{record.class}" unless record.nil? || record.is_a?(Hash) if @chunk_keys.nil? && @chunk_key_time.nil? && @chunk_key_tag.nil? # for tests return Struct.new(:timekey, :tag, :variables).new end # timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes. # it's wrong if timezone is configured as one which supports leap second, but it's very rare and # we can ignore it (especially in production systems). if @chunk_keys.empty? if !@chunk_key_time && !@chunk_key_tag @buffer.() elsif @chunk_key_time && @chunk_key_tag timekey = calculate_timekey(time) @buffer.(timekey: timekey, tag: tag) elsif @chunk_key_time timekey = calculate_timekey(time) @buffer.(timekey: timekey) else @buffer.(tag: tag) end else timekey = if @chunk_key_time calculate_timekey(time) else nil end pairs = Hash[@chunk_key_accessors.map { |k, a| [k, a.call(record)] }] @buffer.(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs) end end |
#multi_workers_ready? ⇒ Boolean
154 155 156 |
# File 'lib/fluent/plugin/output.rb', line 154 def multi_workers_ready? false end |
#next_flush_time ⇒ Object
1176 1177 1178 1179 1180 1181 1182 1183 1184 |
# File 'lib/fluent/plugin/output.rb', line 1176 def next_flush_time if @buffer.queued? @retry_mutex.synchronize do @retry ? @retry.next_time : Time.now + @buffer_config.flush_thread_burst_interval end else Time.now + @buffer_config.flush_thread_interval end end |
#num_errors ⇒ Object
174 175 176 |
# File 'lib/fluent/plugin/output.rb', line 174 def num_errors @num_errors_metrics.get end |
#placeholder_validate!(name, str) ⇒ Object
663 664 665 666 667 |
# File 'lib/fluent/plugin/output.rb', line 663 def placeholder_validate!(name, str) placeholder_validators(name, str).each do |v| v.validate! end end |
#placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) ⇒ Object
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 |
# File 'lib/fluent/plugin/output.rb', line 669 def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) validators = [] sec, title, example = get_placeholders_time(str) if sec || time_key validators << PlaceholderValidator.new(name, str, :time, {sec: sec, title: title, example: example, timekey: time_key}) end parts = get_placeholders_tag(str) if tag_key || !parts.empty? validators << PlaceholderValidator.new(name, str, :tag, {parts: parts, tagkey: tag_key}) end keys = get_placeholders_keys(str) if chunk_keys && !chunk_keys.empty? || !keys.empty? validators << PlaceholderValidator.new(name, str, :keys, {keys: keys, chunkkeys: chunk_keys}) end validators end |
#prefer_buffered_processing ⇒ Object
142 143 144 145 146 147 |
# File 'lib/fluent/plugin/output.rb', line 142 def prefer_buffered_processing # override this method to return false only when all of these are true: # * plugin has both implementation for buffered and non-buffered methods # * plugin is expected to work as non-buffered plugin if no `<buffer>` sections specified true end |
#prefer_delayed_commit ⇒ Object
149 150 151 152 |
# File 'lib/fluent/plugin/output.rb', line 149 def prefer_delayed_commit # override this method to decide which is used of `write` or `try_write` if both are implemented true end |
#process(tag, es) ⇒ Object
114 115 116 |
# File 'lib/fluent/plugin/output.rb', line 114 def process(tag, es) raise NotImplementedError, "BUG: output plugins MUST implement this method" end |
#retry_state(randomize) ⇒ Object
1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 |
# File 'lib/fluent/plugin/output.rb', line 1367 def retry_state(randomize) if @secondary retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, secondary: true, secondary_threshold: @buffer_config.retry_secondary_threshold, randomize: randomize ) else retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, randomize: randomize ) end end |
#rollback_count ⇒ Object
194 195 196 |
# File 'lib/fluent/plugin/output.rb', line 194 def rollback_count @rollback_count_metrics.get end |
#rollback_write(chunk_id, update_retry: true) ⇒ Object
update_retry parameter is for preventing busy loop by async write We will remove this parameter by re-design retry_state management between threads.
1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 |
# File 'lib/fluent/plugin/output.rb', line 1126 def rollback_write(chunk_id, update_retry: true) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end # returns true if chunk was rollbacked as expected # false if chunk was already flushed and couldn't be rollbacked unexpectedly # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) @rollback_count_metrics.inc if update_retry primary = @as_secondary ? @primary_instance : self primary.update_retry_state(chunk_id, @as_secondary) end true else false end end |
#shutdown ⇒ Object
556 557 558 559 560 561 |
# File 'lib/fluent/plugin/output.rb', line 556 def shutdown @secondary.shutdown if @secondary @buffer.shutdown if @buffering && @buffer super end |
#start ⇒ Object
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 |
# File 'lib/fluent/plugin/output.rb', line 447 def start super if @buffering.nil? @buffering = prefer_buffered_processing if !@buffering && @buffer @buffer.terminate # it's not started, so terminate will be enough # At here, this plugin works as non-buffered plugin. # Un-assign @buffer not to show buffering metrics (e.g., in_monitor_agent) @buffer = nil end end if @buffering m = method(:emit_buffered) singleton_class.module_eval do define_method(:emit_events, m) end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit) prefer_delayed_commit else implement?(:delayed_commit) end @delayed_commit_timeout = @buffer_config.delayed_commit_timeout else # !@buffering m = method(:emit_sync) singleton_class.module_eval do define_method(:emit_events, m) end end if @buffering && !@as_secondary @retry = nil @retry_mutex = Mutex.new @buffer.start @output_enqueue_thread = nil @output_enqueue_thread_running = true @output_flush_threads = [] @output_flush_threads_mutex = Mutex.new @output_flush_threads_running = true # mainly for test: detect enqueue works as code below: # @output.interrupt_flushes # # emits # @output.enqueue_thread_wait @output_flush_interrupted = false @output_enqueue_thread_mutex = Mutex.new @output_enqueue_thread_waiting = false @dequeued_chunks = [] @dequeued_chunks_mutex = Mutex.new @output_flush_thread_current_position = 0 @buffer_config.flush_thread_count.times do |i| thread_title = "flush_thread_#{i}".to_sym thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new) thread = thread_create(thread_title) do flush_thread_run(thread_state) end thread_state.thread = thread @output_flush_threads_mutex.synchronize do @output_flush_threads << thread_state end end if !@under_plugin_development && (@flush_mode == :interval || @chunk_key_time) @output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run)) end end @secondary.start if @secondary end |
#statistics ⇒ Object
1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 |
# File 'lib/fluent/plugin/output.rb', line 1579 def statistics stats = { 'emit_records' => @emit_records_metrics.get, 'emit_size' => @emit_size_metrics.get, # Respect original name # https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284 'retry_count' => @num_errors_metrics.get, 'emit_count' => @emit_count_metrics.get, 'write_count' => @write_count_metrics.get, 'rollback_count' => @rollback_count_metrics.get, 'slow_flush_count' => @slow_flush_count_metrics.get, 'flush_time_count' => @flush_time_count_metrics.get, } if @buffer && @buffer.respond_to?(:statistics) (@buffer.statistics['buffer'] || {}).each do |k, v| stats[BUFFER_STATS_KEYS[k]] = v end end { 'output' => stats } end |
#stop ⇒ Object
530 531 532 533 534 535 |
# File 'lib/fluent/plugin/output.rb', line 530 def stop @secondary.stop if @secondary @buffer.stop if @buffering && @buffer super end |
#submit_flush_all ⇒ Object
1408 1409 1410 1411 1412 1413 |
# File 'lib/fluent/plugin/output.rb', line 1408 def submit_flush_all while !@retry && @buffer.queued? submit_flush_once sleep @buffer_config.flush_thread_burst_interval end end |
#submit_flush_once ⇒ Object
1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 |
# File 'lib/fluent/plugin/output.rb', line 1386 def submit_flush_once # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] state.mutex.synchronize { if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception) state.next_clock = 0 state.cond_var.signal else log.warn "thread is already dead" end } Thread.pass end |
#support_in_v12_style?(feature) ⇒ Boolean
639 640 641 642 643 644 645 646 647 648 649 |
# File 'lib/fluent/plugin/output.rb', line 639 def support_in_v12_style?(feature) # for plugins written in v0.12 styles case feature when :synchronous then false when :buffered then false when :delayed_commit then false when :custom_format then false else raise ArgumentError, "unknown feature: #{feature}" end end |
#synchronize_in_threads ⇒ Object
630 631 632 633 634 635 636 637 |
# File 'lib/fluent/plugin/output.rb', line 630 def synchronize_in_threads need_thread_lock = actual_flush_thread_count > 1 if need_thread_lock @flush_thread_mutex.synchronize { yield } else yield end end |
#synchronize_path(path) ⇒ Object
Ensures ‘path` (filename or filepath) processable only by the current thread in the current process. For multiple workers, the lock is shared if `path` is the same value. For multiple threads, the lock is shared by all threads in the same process.
613 614 615 616 617 618 619 |
# File 'lib/fluent/plugin/output.rb', line 613 def synchronize_path(path) synchronize_path_in_workers(path) do synchronize_in_threads do yield end end end |
#synchronize_path_in_workers(path) ⇒ Object
621 622 623 624 625 626 627 628 |
# File 'lib/fluent/plugin/output.rb', line 621 def synchronize_path_in_workers(path) need_worker_lock = system_config.workers > 1 if need_worker_lock acquire_worker_lock(path) { yield } else yield end end |
#terminate ⇒ Object
596 597 598 599 600 601 |
# File 'lib/fluent/plugin/output.rb', line 596 def terminate @buffer.terminate if @buffering && @buffer @secondary.terminate if @secondary super end |
#try_flush ⇒ Object
1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 |
# File 'lib/fluent/plugin/output.rb', line 1188 def try_flush chunk = @buffer.dequeue_chunk return unless chunk log.on_trace { log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) } output = self using_secondary = false if @retry_mutex.synchronize{ @retry && @retry.secondary? } output = @secondary using_secondary = true end if @enable_msgpack_streamer chunk.extend ChunkMessagePackEventStreamer end begin chunk_write_start = Fluent::Clock.now if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) @write_count_metrics.inc @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) end output.try_write(chunk) check_slow_flush(chunk_write_start) else # output plugin without delayed purge chunk_id = chunk.unique_id dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id @write_count_metrics.inc log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) check_slow_flush(chunk_write_start) log.trace "write operation done, committing", chunk: dump_chunk_id commit_write(chunk_id, delayed: false, secondary: using_secondary) log.trace "done to commit a chunk", chunk: dump_chunk_id end rescue *UNRECOVERABLE_ERRORS => e if @secondary if using_secondary log.warn "got unrecoverable error in secondary.", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else if (self.class == @secondary.class) log.warn "got unrecoverable error in primary and secondary type is same as primary. Skip secondary", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else # Call secondary output directly without retry update. # In this case, delayed commit causes inconsistent state in dequeued chunks so async output in secondary is not allowed for now. if @secondary.delayed_commit log.warn "got unrecoverable error in primary and secondary is async output. Skip secondary for backup", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else log.warn "got unrecoverable error in primary. Skip retry and flush chunk to secondary", error: e log.warn_backtrace begin @secondary.write(chunk) commit_write(chunk_id, delayed: output.delayed_commit, secondary: true) rescue => e log.warn "got an error in secondary for unrecoverable error", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) end end end end else log.warn "got unrecoverable error in primary and no secondary", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) end rescue => e log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id) if output.delayed_commit @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id } end end if @buffer.takeback_chunk(chunk.unique_id) @rollback_count_metrics.inc end update_retry_state(chunk.unique_id, using_secondary, e) raise if @under_plugin_development && !@retry_for_error_chunk end end |
#try_rollback_all ⇒ Object
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 |
# File 'lib/fluent/plugin/output.rb', line 1161 def try_rollback_all return unless @dequeued_chunks @dequeued_chunks_mutex.synchronize do until @dequeued_chunks.empty? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @rollback_count_metrics.inc log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id) primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) end end end end |
#try_rollback_write ⇒ Object
1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 |
# File 'lib/fluent/plugin/output.rb', line 1147 def try_rollback_write @dequeued_chunks_mutex.synchronize do while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @rollback_count_metrics.inc log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) end end end end |
#try_write(chunk) ⇒ Object
122 123 124 |
# File 'lib/fluent/plugin/output.rb', line 122 def try_write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end |
#update_retry_state(chunk_id, using_secondary, error = nil) ⇒ Object
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 |
# File 'lib/fluent/plugin/output.rb', line 1309 def update_retry_state(chunk_id, using_secondary, error = nil) @retry_mutex.synchronize do @num_errors_metrics.inc chunk_id_hex = dump_unique_id_hex(chunk_id) unless @retry @retry = retry_state(@buffer_config.retry_randomize) if @retry.limit? handle_limit_reached(error) elsif error log_retry_error(error, chunk_id_hex, using_secondary) end return end # @retry exists # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when # @retry.step is called almost as many times as the number of flush threads in a short time. if Time.now >= @retry.next_time @retry.step else @retry.recalc_next_time # to prevent all flush threads from retrying at the same time end if @retry.limit? handle_limit_reached(error) elsif error log_retry_error(error, chunk_id_hex, using_secondary) end end end |
#write(chunk) ⇒ Object
118 119 120 |
# File 'lib/fluent/plugin/output.rb', line 118 def write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end |
#write_count ⇒ Object
190 191 192 |
# File 'lib/fluent/plugin/output.rb', line 190 def write_count @write_count_metrics.get end |
#write_guard(&block) ⇒ Object
975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 |
# File 'lib/fluent/plugin/output.rb', line 975 def write_guard(&block) begin block.call rescue Fluent::Plugin::Buffer::BufferOverflowError log.warn "failed to write data into buffer by buffer overflow", action: @buffer_config.overflow_action case @buffer_config.overflow_action when :throw_exception raise when :block log.debug "buffer.write is now blocking" until @buffer.storable? if self.stopped? log.error "breaking block behavior to shutdown Fluentd" # to break infinite loop to exit Fluentd process raise end log.trace "sleeping until buffer can store more data" sleep 1 end log.debug "retrying buffer.write after blocked operation" retry when :drop_oldest_chunk begin oldest = @buffer.dequeue_chunk if oldest log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id) @buffer.purge_chunk(oldest.unique_id) else log.error "no queued chunks to be dropped for drop_oldest_chunk" end rescue # ignore any errors end raise unless @buffer.storable? retry else raise "BUG: unknown overflow_action '#{@buffer_config.overflow_action}'" end end end |