Class: Fluent::Plugin::Output
- Defined in:
- lib/fluent/plugin/output.rb
Direct Known Subclasses
Compat::BufferedOutput, Compat::ObjectBufferedOutput, Compat::Output, Compat::TimeSlicedOutput, ExecFilterOutput, ExecOutput, FileOutput, ForwardOutput, HTTPOutput, NullOutput, RelabelOutput, SecondaryFileOutput, StdoutOutput
Defined Under Namespace
Classes: DequeuedChunkInfo, FlushThreadState, PlaceholderValidator
Constant Summary collapse
- CHUNK_KEY_PATTERN =
/^[-_.@a-zA-Z0-9]+$/
- CHUNK_KEY_PLACEHOLDER_PATTERN =
/\$\{([-_.@$a-zA-Z0-9]+)\}/
- CHUNK_TAG_PLACEHOLDER_PATTERN =
/\$\{(tag(?:\[-?\d+\])?)\}/
- CHUNK_ID_PLACEHOLDER_PATTERN =
/\$\{chunk_id\}/
- CHUNKING_FIELD_WARN_NUM =
4
- TIME_KEY_PLACEHOLDER_THRESHOLDS =
[ [1, :second, '%S'], [60, :minute, '%M'], [3600, :hour, '%H'], [86400, :day, '%d'], ]
- TIMESTAMP_CHECK_BASE_TIME =
Time.parse("2016-01-01 00:00:00 UTC")
- FORMAT_MSGPACK_STREAM =
->(e){ e.to_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
- FORMAT_COMPRESSED_MSGPACK_STREAM =
->(e){ e.to_compressed_msgpack_stream(packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
- FORMAT_MSGPACK_STREAM_TIME_INT =
->(e){ e.to_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
- FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT =
->(e){ e.to_compressed_msgpack_stream(time_int: true, packer: Fluent::MessagePackFactory.thread_local_msgpack_packer) }
- UNRECOVERABLE_ERRORS =
[Fluent::UnrecoverableError, TypeError, ArgumentError, NoMethodError, MessagePack::UnpackError, EncodingError]
- BUFFER_STATS_KEYS =
{}
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#as_secondary ⇒ Object
readonly
Returns the value of attribute as_secondary.
-
#buffer ⇒ Object
readonly
for tests.
-
#chunk_key_accessors ⇒ Object
readonly
for tests.
-
#chunk_key_tag ⇒ Object
readonly
for tests.
-
#chunk_key_time ⇒ Object
readonly
for tests.
-
#chunk_keys ⇒ Object
readonly
for tests.
-
#delayed_commit ⇒ Object
readonly
Returns the value of attribute delayed_commit.
-
#delayed_commit_timeout ⇒ Object
readonly
Returns the value of attribute delayed_commit_timeout.
-
#dequeued_chunks ⇒ Object
Returns the value of attribute dequeued_chunks.
-
#dequeued_chunks_mutex ⇒ Object
Returns the value of attribute dequeued_chunks_mutex.
-
#output_enqueue_thread_waiting ⇒ Object
Returns the value of attribute output_enqueue_thread_waiting.
-
#retry ⇒ Object
readonly
for tests.
-
#retry_for_error_chunk ⇒ Object
output_enqueue_thread_waiting: for test of output.rb itself.
-
#secondary ⇒ Object
readonly
for tests.
-
#timekey_zone ⇒ Object
readonly
Returns the value of attribute timekey_zone.
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #acts_as_secondary(primary) ⇒ Object
- #after_shutdown ⇒ Object
- #after_start ⇒ Object
- #backup_chunk(chunk, using_secondary, delayed_commit) ⇒ Object
- #before_shutdown ⇒ Object
- #calculate_timekey(time) ⇒ Object
- #check_slow_flush(start) ⇒ Object
- #chunk_for_test(tag, time, record) ⇒ Object
- #close ⇒ Object
- #commit_write(chunk_id, delayed: @delayed_commit, secondary: false) ⇒ Object
- #configure(conf) ⇒ Object
- #emit_buffered(tag, es) ⇒ Object
- #emit_count ⇒ Object
- #emit_events(tag, es) ⇒ Object
- #emit_records ⇒ Object
- #emit_size ⇒ Object
- #emit_sync(tag, es) ⇒ Object
- #enqueue_thread_run ⇒ Object
-
#enqueue_thread_wait ⇒ Object
only for tests of output plugin.
- #execute_chunking(tag, es, enqueue: false) ⇒ Object
-
#extract_placeholders(str, chunk) ⇒ Object
TODO: optimize this code.
- #flush_thread_run(state) ⇒ Object
-
#flush_thread_wakeup ⇒ Object
only for tests of output plugin.
- #force_flush ⇒ Object
- #format(tag, time, record) ⇒ Object
-
#formatted_to_msgpack_binary ⇒ Object
Compatibility for existing plugins.
- #formatted_to_msgpack_binary? ⇒ Boolean
- #generate_format_proc ⇒ Object
- #get_placeholders_keys(str) ⇒ Object
-
#get_placeholders_tag(str) ⇒ Object
-1 means whole tag.
-
#get_placeholders_time(str) ⇒ Object
it’s not validated to use timekey larger than 1 day.
- #handle_limit_reached(error) ⇒ Object
- #handle_stream_simple(tag, es, enqueue: false) ⇒ Object
-
#handle_stream_with_custom_format(tag, es, enqueue: false) ⇒ Object
metadata_and_data is a Hash of: (standard format) metadata => event stream (custom format) metadata => array of formatted event For standard format, formatting should be done for whole event stream, but “whole event stream” may be a split of “es” here when it’s bigger than chunk_limit_size.
- #handle_stream_with_standard_format(tag, es, enqueue: false) ⇒ Object
- #implement?(feature) ⇒ Boolean
-
#initialize ⇒ Output
constructor
A new instance of Output.
-
#interrupt_flushes ⇒ Object
only for tests of output plugin.
- #log_retry_error(error, chunk_id_hex, using_secondary) ⇒ Object
-
#metadata(tag, time, record) ⇒ Object
TODO: optimize this code.
- #multi_workers_ready? ⇒ Boolean
- #next_flush_time ⇒ Object
- #num_errors ⇒ Object
- #placeholder_validate!(name, str) ⇒ Object
- #placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) ⇒ Object
- #prefer_buffered_processing ⇒ Object
- #prefer_delayed_commit ⇒ Object
- #process(tag, es) ⇒ Object
- #retry_state(randomize) ⇒ Object
- #rollback_count ⇒ Object
-
#rollback_write(chunk_id, update_retry: true) ⇒ Object
update_retry parameter is for preventing busy loop by async write We will remove this parameter by re-design retry_state management between threads.
- #shutdown ⇒ Object
- #start ⇒ Object
- #statistics ⇒ Object
- #stop ⇒ Object
- #submit_flush_all ⇒ Object
- #submit_flush_once ⇒ Object
- #support_in_v12_style?(feature) ⇒ Boolean
- #terminate ⇒ Object
- #try_flush ⇒ Object
- #try_rollback_all ⇒ Object
- #try_rollback_write ⇒ Object
- #try_write(chunk) ⇒ Object
- #update_retry_state(chunk_id, using_secondary, error = nil) ⇒ Object
- #write(chunk) ⇒ Object
- #write_count ⇒ Object
- #write_guard(&block) ⇒ Object
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
Methods included from Fluent::PluginId
#plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir
Methods inherited from Base
#acquire_worker_lock, #after_shutdown?, #after_started?, #before_shutdown?, #called_in_test?, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown?, #started?, #stopped?, #string_safe_encoding, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type
Constructor Details
#initialize ⇒ Output
Returns a new instance of Output.
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/fluent/plugin/output.rb', line 199 def initialize super @counter_mutex = Mutex.new @buffering = false @delayed_commit = false @as_secondary = false @primary_instance = nil # TODO: well organized counters @num_errors_metrics = nil @emit_count_metrics = nil @emit_records_metrics = nil @emit_size_metrics = nil @write_count_metrics = nil @rollback_count_metrics = nil @flush_time_count_metrics = nil @slow_flush_count_metrics = nil @enable_size_metrics = false # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start if implement?(:synchronous) if implement?(:buffered) || implement?(:delayed_commit) @buffering = nil # do #configure or #start to determine this for full-featured plugins else @buffering = false end else @buffering = true end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = false # decided later @buffer = nil @secondary = nil @retry = nil @dequeued_chunks = nil @dequeued_chunks_mutex = nil @output_enqueue_thread = nil @output_flush_threads = nil @output_flush_thread_current_position = 0 @simple_chunking = nil @chunk_keys = @chunk_key_accessors = @chunk_key_time = @chunk_key_tag = nil @flush_mode = nil @timekey_zone = nil @retry_for_error_chunk = false end |
Instance Attribute Details
#as_secondary ⇒ Object (readonly)
Returns the value of attribute as_secondary.
167 168 169 |
# File 'lib/fluent/plugin/output.rb', line 167 def as_secondary @as_secondary end |
#buffer ⇒ Object (readonly)
for tests
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def buffer @buffer end |
#chunk_key_accessors ⇒ Object (readonly)
for tests
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def chunk_key_accessors @chunk_key_accessors end |
#chunk_key_tag ⇒ Object (readonly)
for tests
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def chunk_key_tag @chunk_key_tag end |
#chunk_key_time ⇒ Object (readonly)
for tests
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def chunk_key_time @chunk_key_time end |
#chunk_keys ⇒ Object (readonly)
for tests
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def chunk_keys @chunk_keys end |
#delayed_commit ⇒ Object (readonly)
Returns the value of attribute delayed_commit.
167 168 169 |
# File 'lib/fluent/plugin/output.rb', line 167 def delayed_commit @delayed_commit end |
#delayed_commit_timeout ⇒ Object (readonly)
Returns the value of attribute delayed_commit_timeout.
167 168 169 |
# File 'lib/fluent/plugin/output.rb', line 167 def delayed_commit_timeout @delayed_commit_timeout end |
#dequeued_chunks ⇒ Object
Returns the value of attribute dequeued_chunks.
171 172 173 |
# File 'lib/fluent/plugin/output.rb', line 171 def dequeued_chunks @dequeued_chunks end |
#dequeued_chunks_mutex ⇒ Object
Returns the value of attribute dequeued_chunks_mutex.
171 172 173 |
# File 'lib/fluent/plugin/output.rb', line 171 def dequeued_chunks_mutex @dequeued_chunks_mutex end |
#output_enqueue_thread_waiting ⇒ Object
Returns the value of attribute output_enqueue_thread_waiting.
171 172 173 |
# File 'lib/fluent/plugin/output.rb', line 171 def output_enqueue_thread_waiting @output_enqueue_thread_waiting end |
#retry ⇒ Object (readonly)
for tests
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def retry @retry end |
#retry_for_error_chunk ⇒ Object
output_enqueue_thread_waiting: for test of output.rb itself
173 174 175 |
# File 'lib/fluent/plugin/output.rb', line 173 def retry_for_error_chunk @retry_for_error_chunk end |
#secondary ⇒ Object (readonly)
for tests
170 171 172 |
# File 'lib/fluent/plugin/output.rb', line 170 def secondary @secondary end |
#timekey_zone ⇒ Object (readonly)
Returns the value of attribute timekey_zone.
167 168 169 |
# File 'lib/fluent/plugin/output.rb', line 167 def timekey_zone @timekey_zone end |
Instance Method Details
#acts_as_secondary(primary) ⇒ Object
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/fluent/plugin/output.rb', line 248 def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary @chunk_keys = @primary_instance.chunk_keys || [] @chunk_key_tag = @primary_instance.chunk_key_tag || false if @primary_instance.chunk_key_time @chunk_key_time = @primary_instance.chunk_key_time @timekey_zone = @primary_instance.timekey_zone @output_time_formatter_cache = {} end self.context_router = primary.context_router singleton_class.module_eval do define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) } end end |
#after_shutdown ⇒ Object
554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 |
# File 'lib/fluent/plugin/output.rb', line 554 def after_shutdown try_rollback_all if @buffering && !@as_secondary # rollback regardless with @delayed_commit, because secondary may do it @secondary.after_shutdown if @secondary if @buffering && @buffer @buffer.after_shutdown @output_flush_threads_running = false if @output_flush_threads && !@output_flush_threads.empty? @output_flush_threads.each do |state| # to wakeup thread and make it to stop by itself state.mutex.synchronize { if state.thread && state.thread.status state.next_clock = 0 state.cond_var.signal end } Thread.pass state.thread.join end end end super end |
#after_start ⇒ Object
516 517 518 519 |
# File 'lib/fluent/plugin/output.rb', line 516 def after_start super @secondary.after_start if @secondary end |
#backup_chunk(chunk, using_secondary, delayed_commit) ⇒ Object
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 |
# File 'lib/fluent/plugin/output.rb', line 1242 def backup_chunk(chunk, using_secondary, delayed_commit) if @buffer_config.disable_chunk_backup log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away" else unique_id = dump_unique_id_hex(chunk.unique_id) safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log") backup_dir = File.dirname(backup_file) log.warn "bad chunk is moved to #{backup_file}" FileUtils.mkdir_p(backup_dir, mode: system_config. || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir) File.open(backup_file, 'ab', system_config. || Fluent::DEFAULT_FILE_PERMISSION) { |f| chunk.write_to(f) } end commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit) end |
#before_shutdown ⇒ Object
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 |
# File 'lib/fluent/plugin/output.rb', line 528 def before_shutdown @secondary.before_shutdown if @secondary if @buffering && @buffer if @flush_at_shutdown force_flush end @buffer.before_shutdown # Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data @output_enqueue_thread_running = false if @output_enqueue_thread && @output_enqueue_thread.alive? @output_enqueue_thread.wakeup @output_enqueue_thread.join end end super end |
#calculate_timekey(time) ⇒ Object
903 904 905 906 907 908 909 910 911 |
# File 'lib/fluent/plugin/output.rb', line 903 def calculate_timekey(time) time_int = time.to_i if @timekey_use_utc (time_int - (time_int % @timekey)).to_i else offset = @calculate_offset ? @calculate_offset.call(time) : @offset (time_int - ((time_int + offset)% @timekey)).to_i end end |
#check_slow_flush(start) ⇒ Object
1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 |
# File 'lib/fluent/plugin/output.rb', line 1261 def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start elapsed_millsec = (elapsed_time * 1000).to_i @flush_time_count_metrics.add(elapsed_millsec) if elapsed_time > @slow_flush_log_threshold @slow_flush_count_metrics.inc log.warn "buffer flush took longer time than slow_flush_log_threshold:", elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id end end |
#chunk_for_test(tag, time, record) ⇒ Object
913 914 915 916 917 918 |
# File 'lib/fluent/plugin/output.rb', line 913 def chunk_for_test(tag, time, record) require 'fluent/plugin/buffer/memory_chunk' m = (tag, time, record) Fluent::Plugin::Buffer::MemoryChunk.new(m) end |
#close ⇒ Object
580 581 582 583 584 585 |
# File 'lib/fluent/plugin/output.rb', line 580 def close @buffer.close if @buffering && @buffer @secondary.close if @secondary super end |
#commit_write(chunk_id, delayed: @delayed_commit, secondary: false) ⇒ Object
1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 |
# File 'lib/fluent/plugin/output.rb', line 1057 def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed } if delayed @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end end @buffer.purge_chunk(chunk_id) @retry_mutex.synchronize do if @retry # success to flush chunks in retries if secondary log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id) else log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id) end @retry = nil end end end |
#configure(conf) ⇒ Object
266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 |
# File 'lib/fluent/plugin/output.rb', line 266 def configure(conf) unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit) raise "BUG: output plugin must implement some methods. see developer documents." end has_buffer_section = (conf.elements(name: 'buffer').size > 0) has_flush_interval = conf.has_key?('flush_interval') super @num_errors_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "num_errors", help_text: "Number of count num errors") @emit_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_count", help_text: "Number of count emits") @emit_records_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_records", help_text: "Number of emit records") @emit_size_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "emit_size", help_text: "Total size of emit events") @write_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "write_count", help_text: "Number of writing events") @rollback_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "rollback_count", help_text: "Number of rollbacking operations") @flush_time_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "flush_time_count", help_text: "Count of flush time") @slow_flush_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "slow_flush_count", help_text: "Count of slow flush occurred time(s)") if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering" end @buffering = true else # no buffer sections if implement?(:synchronous) if !implement?(:buffered) && !implement?(:delayed_commit) if @as_secondary raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't." end @buffering = false else if @as_secondary # secondary plugin always works as buffered plugin without buffer instance @buffering = true else # @buffering.nil? shows that enabling buffering or not will be decided in lazy way in #start @buffering = nil end end else # buffered or delayed_commit is supported by `unless` of first line in this method @buffering = true end end # Enable to update record size metrics or not @enable_size_metrics = !!system_config.enable_size_metrics if @as_secondary if !@buffering && !@buffering.nil? raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't" end end if (@buffering || @buffering.nil?) && !@as_secondary # When @buffering.nil?, @buffer_config was initialized with default value for all parameters. # If so, this configuration MUST success. @chunk_keys = @buffer_config.chunk_keys.dup @chunk_key_time = !!@chunk_keys.delete('time') @chunk_key_tag = !!@chunk_keys.delete('tag') if @chunk_keys.any? { |key| begin k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key) if k.is_a?(String) k !~ CHUNK_KEY_PATTERN else if key.start_with?('$[') raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed" else false end end rescue => e raise Fluent::ConfigError, "in chunk_keys: #{e.}" end } raise Fluent::ConfigError, "chunk_keys specification includes invalid char" else @chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }] end if @chunk_key_time raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey Fluent::Timezone.validate!(@buffer_config.timekey_zone) @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone @timekey = @buffer_config.timekey if @timekey <= 0 raise Fluent::ConfigError, "timekey should be greater than 0. current timekey: #{@timekey}" end @timekey_use_utc = @buffer_config.timekey_use_utc @offset = Fluent::Timezone.utc_offset(@timekey_zone) @calculate_offset = @offset.respond_to?(:call) ? @offset : nil @output_time_formatter_cache = {} end if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM log.warn "many chunk keys specified, and it may cause too many chunks on your system." end # no chunk keys or only tags (chunking can be done without iterating event stream) @simple_chunking = !@chunk_key_time && @chunk_keys.empty? @flush_mode = @buffer_config.flush_mode if @flush_mode == :default if has_flush_interval log.info "'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour" @flush_mode = :interval else @flush_mode = (@chunk_key_time ? :lazy : :interval) end end buffer_type = @buffer_config[:@type] buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, []) @buffer = Plugin.new_buffer(buffer_type, parent: self) @buffer.configure(buffer_conf) @buffer.enable_update_timekeys if @chunk_key_time @flush_at_shutdown = @buffer_config.flush_at_shutdown if @flush_at_shutdown.nil? @flush_at_shutdown = if @buffer.persistent? false else true # flush_at_shutdown is true in default for on-memory buffer end elsif !@flush_at_shutdown && !@buffer.persistent? buf_type = Plugin.lookup_type_from_class(@buffer.class) log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer." log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again." end if (@flush_mode != :interval) && buffer_conf.has_key?('flush_interval') if buffer_conf.has_key?('flush_mode') raise Fluent::ConfigError, "'flush_interval' can't be specified when 'flush_mode' is not 'interval' explicitly: '#{@flush_mode}'" else log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'" end end if @buffer.queued_chunks_limit_size.nil? @buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count end end if @secondary_config raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary if @buffer_config.retry_forever log.warn "<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary" end secondary_type = @secondary_config[:@type] unless secondary_type secondary_type = conf['@type'] # primary plugin type end secondary_conf = conf.elements(name: 'secondary').first @secondary = Plugin.new_output(secondary_type) unless @secondary.respond_to?(:acts_as_secondary) raise Fluent::ConfigError, "Failed to setup secondary plugin in '#{conf['@type']}'. '#{secondary_type}' plugin in not allowed due to non buffered output" end @secondary.acts_as_secondary(self) @secondary.configure(secondary_conf) if (self.class != @secondary.class) && (@custom_format || @secondary.implement?(:custom_format)) log.warn "Use different plugin for secondary. Check the plugin works with primary like secondary_file", primary: self.class.to_s, secondary: @secondary.class.to_s end else @secondary = nil end self end |
#emit_buffered(tag, es) ⇒ Object
849 850 851 852 853 854 855 856 857 858 859 860 861 |
# File 'lib/fluent/plugin/output.rb', line 849 def emit_buffered(tag, es) @emit_count_metrics.inc begin execute_chunking(tag, es, enqueue: (@flush_mode == :immediate)) if !@retry && @buffer.queued?(nil, optimistic: true) submit_flush_once end rescue # TODO: separate number of errors into emit errors and write/flush errors @num_errors_metrics.inc raise end end |
#emit_count ⇒ Object
179 180 181 |
# File 'lib/fluent/plugin/output.rb', line 179 def emit_count @emit_count_metrics.get end |
#emit_events(tag, es) ⇒ Object
828 829 830 831 832 833 834 835 |
# File 'lib/fluent/plugin/output.rb', line 828 def emit_events(tag, es) # actually this method will be overwritten by #configure if @buffering emit_buffered(tag, es) else emit_sync(tag, es) end end |
#emit_records ⇒ Object
187 188 189 |
# File 'lib/fluent/plugin/output.rb', line 187 def emit_records @emit_records_metrics.get end |
#emit_size ⇒ Object
183 184 185 |
# File 'lib/fluent/plugin/output.rb', line 183 def emit_size @emit_size_metrics.get end |
#emit_sync(tag, es) ⇒ Object
837 838 839 840 841 842 843 844 845 846 847 |
# File 'lib/fluent/plugin/output.rb', line 837 def emit_sync(tag, es) @emit_count_metrics.inc begin process(tag, es) @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics rescue @num_errors_metrics.inc raise end end |
#enqueue_thread_run ⇒ Object
1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 |
# File 'lib/fluent/plugin/output.rb', line 1408 def enqueue_thread_run value_for_interval = nil if @flush_mode == :interval value_for_interval = @buffer_config.flush_interval end if @chunk_key_time if !value_for_interval || @buffer_config.timekey < value_for_interval value_for_interval = [@buffer_config.timekey, @buffer_config.timekey_wait].min end end unless value_for_interval raise "BUG: both of flush_interval and timekey are disabled" end interval = value_for_interval / 11.0 if interval < @buffer_config.flush_thread_interval interval = @buffer_config.flush_thread_interval end while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "enqueue_thread actually running" begin while @output_enqueue_thread_running now_int = Time.now.to_i if @output_flush_interrupted sleep interval next end @output_enqueue_thread_mutex.lock begin if @flush_mode == :interval flush_interval = @buffer_config.flush_interval.to_i # This block should be done by integer values. # If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s. # If we use integered values for this comparison, expected actual flush timing is 1.0s. @buffer.enqueue_all{ |, chunk| chunk.raw_create_at + flush_interval <= now_int } end if @chunk_key_time timekey_unit = @buffer_config.timekey timekey_wait = @buffer_config.timekey_wait current_timekey = now_int - now_int % timekey_unit @buffer.enqueue_all{ |, chunk| .timekey < current_timekey && .timekey + timekey_unit + timekey_wait <= now_int } end rescue => e raise if @under_plugin_development log.error "unexpected error while checking flushed chunks. ignored.", error: e log.error_backtrace ensure @output_enqueue_thread_waiting = false @output_enqueue_thread_mutex.unlock end sleep interval end rescue => e # normal errors are rescued by inner begin-rescue clause. log.error "error on enqueue thread", error: e log.error_backtrace raise end end |
#enqueue_thread_wait ⇒ Object
only for tests of output plugin
1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 |
# File 'lib/fluent/plugin/output.rb', line 1384 def enqueue_thread_wait @output_enqueue_thread_mutex.synchronize do @output_flush_interrupted = false @output_enqueue_thread_waiting = true end require 'timeout' Timeout.timeout(10) do Thread.pass while @output_enqueue_thread_waiting end end |
#execute_chunking(tag, es, enqueue: false) ⇒ Object
920 921 922 923 924 925 926 927 928 |
# File 'lib/fluent/plugin/output.rb', line 920 def execute_chunking(tag, es, enqueue: false) if @simple_chunking handle_stream_simple(tag, es, enqueue: enqueue) elsif @custom_format handle_stream_with_custom_format(tag, es, enqueue: enqueue) else handle_stream_with_standard_format(tag, es, enqueue: enqueue) end end |
#extract_placeholders(str, chunk) ⇒ Object
TODO: optimize this code
753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 |
# File 'lib/fluent/plugin/output.rb', line 753 def extract_placeholders(str, chunk) = if chunk.is_a?(Fluent::Plugin::Buffer::Chunk) chunk_passed = true chunk. else chunk_passed = false # For existing plugins. Old plugin passes Chunk.metadata instead of Chunk chunk end if .empty? str.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { if chunk_passed dump_unique_id_hex(chunk.unique_id) else log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" end } else rvalue = str.dup # strftime formatting if @chunk_key_time # this section MUST be earlier than rest to use raw 'str' @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str) rvalue = @output_time_formatter_cache[str].call(.timekey) end # ${tag}, ${tag[0]}, ${tag[1]}, ... , ${tag[-2]}, ${tag[-1]} if @chunk_key_tag if str.include?('${tag}') rvalue = rvalue.gsub('${tag}', .tag) end if str =~ CHUNK_TAG_PLACEHOLDER_PATTERN hash = {} tag_parts = .tag.split('.') tag_parts.each_with_index do |part, i| hash["${tag[#{i}]}"] = part hash["${tag[#{i-tag_parts.size}]}"] = part end rvalue = rvalue.gsub(CHUNK_TAG_PLACEHOLDER_PATTERN, hash) end if rvalue =~ CHUNK_TAG_PLACEHOLDER_PATTERN log.warn "tag placeholder '#{$1}' not replaced. tag:#{.tag}, template:#{str}" end end # First we replace ${chunk_id} with chunk.unique_id (hexlified). rvalue = rvalue.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { if chunk_passed dump_unique_id_hex(chunk.unique_id) else log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" end } # Then, replace other ${chunk_key}s. if !@chunk_keys.empty? && .variables hash = {'${tag}' => '${tag}'} # not to erase this wrongly @chunk_keys.each do |key| hash["${#{key}}"] = .variables[key.to_sym] end rvalue = rvalue.gsub(CHUNK_KEY_PLACEHOLDER_PATTERN) do |matched| hash.fetch(matched) do log.warn "chunk key placeholder '#{matched[2..-2]}' not replaced. template:#{str}" '' end end end if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN log.warn "chunk key placeholder '#{$1}' not replaced. template:#{str}" end rvalue end end |
#flush_thread_run(state) ⇒ Object
1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 |
# File 'lib/fluent/plugin/output.rb', line 1473 def flush_thread_run(state) flush_thread_interval = @buffer_config.flush_thread_interval state.next_clock = Fluent::Clock.now + flush_thread_interval while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "flush_thread actually running" state.mutex.lock begin # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase while @output_flush_threads_running current_clock = Fluent::Clock.now next_retry_time = nil @retry_mutex.synchronize do next_retry_time = @retry ? @retry.next_time : nil end if state.next_clock > current_clock interval = state.next_clock - current_clock elsif next_retry_time && next_retry_time > Time.now interval = next_retry_time.to_f - Time.now.to_f else state.mutex.unlock begin try_flush # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying) interval = next_flush_time.to_f - Time.now.to_f # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected # because @retry still exists (#commit_write is not called yet in #try_flush) # @retry should be cleared if delayed commit is enabled? Or any other solution? state.next_clock = Fluent::Clock.now + interval ensure state.mutex.lock end end if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? } unless @output_flush_interrupted state.mutex.unlock begin try_rollback_write ensure state.mutex.lock end end end state.cond_var.wait(state.mutex, interval) if interval > 0 end rescue => e # normal errors are rescued by output plugins in #try_flush # so this rescue section is for critical & unrecoverable errors log.error "error on output thread", error: e log.error_backtrace raise ensure state.mutex.unlock end end |
#flush_thread_wakeup ⇒ Object
only for tests of output plugin
1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 |
# File 'lib/fluent/plugin/output.rb', line 1396 def flush_thread_wakeup @output_flush_threads.each do |state| state.mutex.synchronize { if state.thread && state.thread.status state.next_clock = 0 state.cond_var.signal end } Thread.pass end end |
#force_flush ⇒ Object
1364 1365 1366 1367 1368 1369 |
# File 'lib/fluent/plugin/output.rb', line 1364 def force_flush if @buffering @buffer.enqueue_all(true) submit_flush_all end end |
#format(tag, time, record) ⇒ Object
127 128 129 130 |
# File 'lib/fluent/plugin/output.rb', line 127 def format(tag, time, record) # standard msgpack_event_stream chunk will be used if this method is not implemented in plugin subclass raise NotImplementedError, "BUG: output plugins MUST implement this method" end |
#formatted_to_msgpack_binary ⇒ Object
Compatibility for existing plugins
139 140 141 |
# File 'lib/fluent/plugin/output.rb', line 139 def formatted_to_msgpack_binary formatted_to_msgpack_binary? end |
#formatted_to_msgpack_binary? ⇒ Boolean
132 133 134 135 136 |
# File 'lib/fluent/plugin/output.rb', line 132 def formatted_to_msgpack_binary? # To indicate custom format method (#format) returns msgpack binary or not. # If #format returns msgpack binary, override this method to return true. false end |
#generate_format_proc ⇒ Object
976 977 978 979 980 981 982 |
# File 'lib/fluent/plugin/output.rb', line 976 def generate_format_proc if @buffer && @buffer.compress == :gzip @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM else @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM end end |
#get_placeholders_keys(str) ⇒ Object
748 749 750 |
# File 'lib/fluent/plugin/output.rb', line 748 def get_placeholders_keys(str) str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map(&:first).reject{|s| (s == "tag") || (s == 'chunk_id') }.sort end |
#get_placeholders_tag(str) ⇒ Object
-1 means whole tag
735 736 737 738 739 740 741 742 743 744 745 746 |
# File 'lib/fluent/plugin/output.rb', line 735 def get_placeholders_tag(str) # [["tag"],["tag[0]"]] parts = [] str.scan(CHUNK_TAG_PLACEHOLDER_PATTERN).map(&:first).each do |ph| if ph == "tag" parts << -1 elsif ph =~ /^tag\[(-?\d+)\]$/ parts << $1.to_i end end parts.sort end |
#get_placeholders_time(str) ⇒ Object
it’s not validated to use timekey larger than 1 day
725 726 727 728 729 730 731 732 |
# File 'lib/fluent/plugin/output.rb', line 725 def get_placeholders_time(str) base_str = TIMESTAMP_CHECK_BASE_TIME.strftime(str) TIME_KEY_PLACEHOLDER_THRESHOLDS.each do |triple| sec = triple.first return triple if (TIMESTAMP_CHECK_BASE_TIME + sec).strftime(str) != base_str end nil end |
#handle_limit_reached(error) ⇒ Object
1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 |
# File 'lib/fluent/plugin/output.rb', line 1318 def handle_limit_reached(error) if error records = @buffer.queued_records msg = "Hit limit for retries. dropping all chunks in the buffer queue." log.error msg, retry_times: @retry.steps, records: records, error: error log.error_backtrace error.backtrace end @buffer.clear_queue! log.debug "buffer queue cleared" @retry = nil end |
#handle_stream_simple(tag, es, enqueue: false) ⇒ Object
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 |
# File 'lib/fluent/plugin/output.rb', line 1031 def handle_stream_simple(tag, es, enqueue: false) format_proc = nil = ((@chunk_key_tag ? tag : nil), nil, nil) records = es.size if @custom_format records = 0 data = [] es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| res = format(tag, time, record) if res data << res records += 1 end end else format_proc = generate_format_proc data = es end write_guard do @buffer.write({ => data}, format: format_proc, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end |
#handle_stream_with_custom_format(tag, es, enqueue: false) ⇒ Object
metadata_and_data is a Hash of:
(standard format) metadata => event stream
(custom format) metadata => array of formatted event
For standard format, formatting should be done for whole event stream, but
"whole event stream" may be a split of "es" here when it's bigger than chunk_limit_size.
`@buffer.write` will do this splitting.
For custom format, formatting will be done here. Custom formatting always requires
iteration of event stream, and it should be done just once even if total event stream size
is bigger than chunk_limit_size because of performance.
993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 |
# File 'lib/fluent/plugin/output.rb', line 993 def handle_stream_with_custom_format(tag, es, enqueue: false) = {} records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| = (tag, time, record) [] ||= [] res = format(tag, time, record) if res [] << res records += 1 end end write_guard do @buffer.write(, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end |
#handle_stream_with_standard_format(tag, es, enqueue: false) ⇒ Object
1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 |
# File 'lib/fluent/plugin/output.rb', line 1013 def handle_stream_with_standard_format(tag, es, enqueue: false) format_proc = generate_format_proc = {} records = 0 es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record| = (tag, time, record) [] ||= MultiEventStream.new [].add(time, record) records += 1 end write_guard do @buffer.write(, format: format_proc, enqueue: enqueue) end @emit_records_metrics.add(es.size) @emit_size_metrics.add(es.to_msgpack_stream.bytesize) if @enable_size_metrics true end |
#implement?(feature) ⇒ Boolean
606 607 608 609 610 611 612 613 614 615 616 |
# File 'lib/fluent/plugin/output.rb', line 606 def implement?(feature) methods_of_plugin = self.class.instance_methods(false) case feature when :synchronous then methods_of_plugin.include?(:process) || support_in_v12_style?(:synchronous) when :buffered then methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered) when :delayed_commit then methods_of_plugin.include?(:try_write) when :custom_format then methods_of_plugin.include?(:format) || support_in_v12_style?(:custom_format) else raise ArgumentError, "Unknown feature for output plugin: #{feature}" end end |
#interrupt_flushes ⇒ Object
only for tests of output plugin
1379 1380 1381 |
# File 'lib/fluent/plugin/output.rb', line 1379 def interrupt_flushes @output_flush_interrupted = true end |
#log_retry_error(error, chunk_id_hex, using_secondary) ⇒ Object
1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 |
# File 'lib/fluent/plugin/output.rb', line 1307 def log_retry_error(error, chunk_id_hex, using_secondary) return unless error if using_secondary msg = "failed to flush the buffer with secondary output." else msg = "failed to flush the buffer." end log.warn(msg, retry_times: @retry.steps, next_retry_time: @retry.next_time.round, chunk: chunk_id_hex, error: error) log.warn_backtrace(error.backtrace) end |
#metadata(tag, time, record) ⇒ Object
TODO: optimize this code
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 |
# File 'lib/fluent/plugin/output.rb', line 864 def (tag, time, record) # this arguments are ordered in output plugin's rule # Metadata 's argument order is different from this one (timekey, tag, variables) raise ArgumentError, "tag must be a String: #{tag.class}" unless tag.nil? || tag.is_a?(String) raise ArgumentError, "time must be a Fluent::EventTime (or Integer): #{time.class}" unless time.nil? || time.is_a?(Fluent::EventTime) || time.is_a?(Integer) raise ArgumentError, "record must be a Hash: #{record.class}" unless record.nil? || record.is_a?(Hash) if @chunk_keys.nil? && @chunk_key_time.nil? && @chunk_key_tag.nil? # for tests return Struct.new(:timekey, :tag, :variables).new end # timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes. # it's wrong if timezone is configured as one which supports leap second, but it's very rare and # we can ignore it (especially in production systems). if @chunk_keys.empty? if !@chunk_key_time && !@chunk_key_tag @buffer.() elsif @chunk_key_time && @chunk_key_tag timekey = calculate_timekey(time) @buffer.(timekey: timekey, tag: tag) elsif @chunk_key_time timekey = calculate_timekey(time) @buffer.(timekey: timekey) else @buffer.(tag: tag) end else timekey = if @chunk_key_time calculate_timekey(time) else nil end pairs = Hash[@chunk_key_accessors.map { |k, a| [k, a.call(record)] }] @buffer.(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs) end end |
#multi_workers_ready? ⇒ Boolean
155 156 157 |
# File 'lib/fluent/plugin/output.rb', line 155 def multi_workers_ready? false end |
#next_flush_time ⇒ Object
1131 1132 1133 1134 1135 1136 1137 1138 1139 |
# File 'lib/fluent/plugin/output.rb', line 1131 def next_flush_time if @buffer.queued? @retry_mutex.synchronize do @retry ? @retry.next_time : Time.now + @buffer_config.flush_thread_burst_interval end else Time.now + @buffer_config.flush_thread_interval end end |
#num_errors ⇒ Object
175 176 177 |
# File 'lib/fluent/plugin/output.rb', line 175 def num_errors @num_errors_metrics.get end |
#placeholder_validate!(name, str) ⇒ Object
618 619 620 621 622 |
# File 'lib/fluent/plugin/output.rb', line 618 def placeholder_validate!(name, str) placeholder_validators(name, str).each do |v| v.validate! end end |
#placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) ⇒ Object
624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 |
# File 'lib/fluent/plugin/output.rb', line 624 def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) validators = [] sec, title, example = get_placeholders_time(str) if sec || time_key validators << PlaceholderValidator.new(name, str, :time, {sec: sec, title: title, example: example, timekey: time_key}) end parts = get_placeholders_tag(str) if tag_key || !parts.empty? validators << PlaceholderValidator.new(name, str, :tag, {parts: parts, tagkey: tag_key}) end keys = get_placeholders_keys(str) if chunk_keys && !chunk_keys.empty? || !keys.empty? validators << PlaceholderValidator.new(name, str, :keys, {keys: keys, chunkkeys: chunk_keys}) end validators end |
#prefer_buffered_processing ⇒ Object
143 144 145 146 147 148 |
# File 'lib/fluent/plugin/output.rb', line 143 def prefer_buffered_processing # override this method to return false only when all of these are true: # * plugin has both implementation for buffered and non-buffered methods # * plugin is expected to work as non-buffered plugin if no `<buffer>` sections specified true end |
#prefer_delayed_commit ⇒ Object
150 151 152 153 |
# File 'lib/fluent/plugin/output.rb', line 150 def prefer_delayed_commit # override this method to decide which is used of `write` or `try_write` if both are implemented true end |
#process(tag, es) ⇒ Object
115 116 117 |
# File 'lib/fluent/plugin/output.rb', line 115 def process(tag, es) raise NotImplementedError, "BUG: output plugins MUST implement this method" end |
#retry_state(randomize) ⇒ Object
1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 |
# File 'lib/fluent/plugin/output.rb', line 1330 def retry_state(randomize) if @secondary retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, secondary: true, secondary_threshold: @buffer_config.retry_secondary_threshold, randomize: randomize ) else retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, randomize: randomize ) end end |
#rollback_count ⇒ Object
195 196 197 |
# File 'lib/fluent/plugin/output.rb', line 195 def rollback_count @rollback_count_metrics.get end |
#rollback_write(chunk_id, update_retry: true) ⇒ Object
update_retry parameter is for preventing busy loop by async write We will remove this parameter by re-design retry_state management between threads.
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 |
# File 'lib/fluent/plugin/output.rb', line 1081 def rollback_write(chunk_id, update_retry: true) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end # returns true if chunk was rollbacked as expected # false if chunk was already flushed and couldn't be rollbacked unexpectedly # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) @rollback_count_metrics.inc if update_retry primary = @as_secondary ? @primary_instance : self primary.update_retry_state(chunk_id, @as_secondary) end true else false end end |
#shutdown ⇒ Object
547 548 549 550 551 552 |
# File 'lib/fluent/plugin/output.rb', line 547 def shutdown @secondary.shutdown if @secondary @buffer.shutdown if @buffering && @buffer super end |
#start ⇒ Object
438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 |
# File 'lib/fluent/plugin/output.rb', line 438 def start super if @buffering.nil? @buffering = prefer_buffered_processing if !@buffering && @buffer @buffer.terminate # it's not started, so terminate will be enough # At here, this plugin works as non-buffered plugin. # Un-assign @buffer not to show buffering metrics (e.g., in_monitor_agent) @buffer = nil end end if @buffering m = method(:emit_buffered) singleton_class.module_eval do define_method(:emit_events, m) end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit) prefer_delayed_commit else implement?(:delayed_commit) end @delayed_commit_timeout = @buffer_config.delayed_commit_timeout else # !@buffering m = method(:emit_sync) singleton_class.module_eval do define_method(:emit_events, m) end end if @buffering && !@as_secondary @retry = nil @retry_mutex = Mutex.new @buffer.start @output_enqueue_thread = nil @output_enqueue_thread_running = true @output_flush_threads = [] @output_flush_threads_mutex = Mutex.new @output_flush_threads_running = true # mainly for test: detect enqueue works as code below: # @output.interrupt_flushes # # emits # @output.enqueue_thread_wait @output_flush_interrupted = false @output_enqueue_thread_mutex = Mutex.new @output_enqueue_thread_waiting = false @dequeued_chunks = [] @dequeued_chunks_mutex = Mutex.new @output_flush_thread_current_position = 0 @buffer_config.flush_thread_count.times do |i| thread_title = "flush_thread_#{i}".to_sym thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new) thread = thread_create(thread_title) do flush_thread_run(thread_state) end thread_state.thread = thread @output_flush_threads_mutex.synchronize do @output_flush_threads << thread_state end end if !@under_plugin_development && (@flush_mode == :interval || @chunk_key_time) @output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run)) end end @secondary.start if @secondary end |
#statistics ⇒ Object
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 |
# File 'lib/fluent/plugin/output.rb', line 1542 def statistics stats = { 'emit_records' => @emit_records_metrics.get, 'emit_size' => @emit_size_metrics.get, # Respect original name # https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284 'retry_count' => @num_errors_metrics.get, 'emit_count' => @emit_count_metrics.get, 'write_count' => @write_count_metrics.get, 'rollback_count' => @rollback_count_metrics.get, 'slow_flush_count' => @slow_flush_count_metrics.get, 'flush_time_count' => @flush_time_count_metrics.get, } if @buffer && @buffer.respond_to?(:statistics) (@buffer.statistics['buffer'] || {}).each do |k, v| stats[BUFFER_STATS_KEYS[k]] = v end end { 'output' => stats } end |
#stop ⇒ Object
521 522 523 524 525 526 |
# File 'lib/fluent/plugin/output.rb', line 521 def stop @secondary.stop if @secondary @buffer.stop if @buffering && @buffer super end |
#submit_flush_all ⇒ Object
1371 1372 1373 1374 1375 1376 |
# File 'lib/fluent/plugin/output.rb', line 1371 def submit_flush_all while !@retry && @buffer.queued? submit_flush_once sleep @buffer_config.flush_thread_burst_interval end end |
#submit_flush_once ⇒ Object
1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 |
# File 'lib/fluent/plugin/output.rb', line 1349 def submit_flush_once # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] state.mutex.synchronize { if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception) state.next_clock = 0 state.cond_var.signal else log.warn "thread is already dead" end } Thread.pass end |
#support_in_v12_style?(feature) ⇒ Boolean
594 595 596 597 598 599 600 601 602 603 604 |
# File 'lib/fluent/plugin/output.rb', line 594 def support_in_v12_style?(feature) # for plugins written in v0.12 styles case feature when :synchronous then false when :buffered then false when :delayed_commit then false when :custom_format then false else raise ArgumentError, "unknown feature: #{feature}" end end |
#terminate ⇒ Object
587 588 589 590 591 592 |
# File 'lib/fluent/plugin/output.rb', line 587 def terminate @buffer.terminate if @buffering && @buffer @secondary.terminate if @secondary super end |
#try_flush ⇒ Object
1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 |
# File 'lib/fluent/plugin/output.rb', line 1143 def try_flush chunk = @buffer.dequeue_chunk return unless chunk log.on_trace { log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) } output = self using_secondary = false if @retry_mutex.synchronize{ @retry && @retry.secondary? } output = @secondary using_secondary = true end if @enable_msgpack_streamer chunk.extend ChunkMessagePackEventStreamer end begin chunk_write_start = Fluent::Clock.now if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) @write_count_metrics.inc @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) end output.try_write(chunk) check_slow_flush(chunk_write_start) else # output plugin without delayed purge chunk_id = chunk.unique_id dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id @write_count_metrics.inc log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) check_slow_flush(chunk_write_start) log.trace "write operation done, committing", chunk: dump_chunk_id commit_write(chunk_id, delayed: false, secondary: using_secondary) log.trace "done to commit a chunk", chunk: dump_chunk_id end rescue *UNRECOVERABLE_ERRORS => e if @secondary if using_secondary log.warn "got unrecoverable error in secondary.", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else if (self.class == @secondary.class) log.warn "got unrecoverable error in primary and secondary type is same as primary. Skip secondary", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else # Call secondary output directly without retry update. # In this case, delayed commit causes inconsistent state in dequeued chunks so async output in secondary is not allowed for now. if @secondary.delayed_commit log.warn "got unrecoverable error in primary and secondary is async output. Skip secondary for backup", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else log.warn "got unrecoverable error in primary. Skip retry and flush chunk to secondary", error: e log.warn_backtrace begin @secondary.write(chunk) commit_write(chunk_id, delayed: output.delayed_commit, secondary: true) rescue => e log.warn "got an error in secondary for unrecoverable error", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) end end end end else log.warn "got unrecoverable error in primary and no secondary", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) end rescue => e log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id) if output.delayed_commit @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id } end end if @buffer.takeback_chunk(chunk.unique_id) @rollback_count_metrics.inc end update_retry_state(chunk.unique_id, using_secondary, e) raise if @under_plugin_development && !@retry_for_error_chunk end end |
#try_rollback_all ⇒ Object
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 |
# File 'lib/fluent/plugin/output.rb', line 1116 def try_rollback_all return unless @dequeued_chunks @dequeued_chunks_mutex.synchronize do until @dequeued_chunks.empty? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @rollback_count_metrics.inc log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id) primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) end end end end |
#try_rollback_write ⇒ Object
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 |
# File 'lib/fluent/plugin/output.rb', line 1102 def try_rollback_write @dequeued_chunks_mutex.synchronize do while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @rollback_count_metrics.inc log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) end end end end |
#try_write(chunk) ⇒ Object
123 124 125 |
# File 'lib/fluent/plugin/output.rb', line 123 def try_write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end |
#update_retry_state(chunk_id, using_secondary, error = nil) ⇒ Object
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 |
# File 'lib/fluent/plugin/output.rb', line 1272 def update_retry_state(chunk_id, using_secondary, error = nil) @retry_mutex.synchronize do @num_errors_metrics.inc chunk_id_hex = dump_unique_id_hex(chunk_id) unless @retry @retry = retry_state(@buffer_config.retry_randomize) if @retry.limit? handle_limit_reached(error) elsif error log_retry_error(error, chunk_id_hex, using_secondary) end return end # @retry exists # Ensure that the current time is greater than or equal to @retry.next_time to avoid the situation when # @retry.step is called almost as many times as the number of flush threads in a short time. if Time.now >= @retry.next_time @retry.step else @retry.recalc_next_time # to prevent all flush threads from retrying at the same time end if @retry.limit? handle_limit_reached(error) elsif error log_retry_error(error, chunk_id_hex, using_secondary) end end end |
#write(chunk) ⇒ Object
119 120 121 |
# File 'lib/fluent/plugin/output.rb', line 119 def write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end |
#write_count ⇒ Object
191 192 193 |
# File 'lib/fluent/plugin/output.rb', line 191 def write_count @write_count_metrics.get end |
#write_guard(&block) ⇒ Object
930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 |
# File 'lib/fluent/plugin/output.rb', line 930 def write_guard(&block) begin block.call rescue Fluent::Plugin::Buffer::BufferOverflowError log.warn "failed to write data into buffer by buffer overflow", action: @buffer_config.overflow_action case @buffer_config.overflow_action when :throw_exception raise when :block log.debug "buffer.write is now blocking" until @buffer.storable? if self.stopped? log.error "breaking block behavior to shutdown Fluentd" # to break infinite loop to exit Fluentd process raise end log.trace "sleeping until buffer can store more data" sleep 1 end log.debug "retrying buffer.write after blocked operation" retry when :drop_oldest_chunk begin oldest = @buffer.dequeue_chunk if oldest log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id) @buffer.purge_chunk(oldest.unique_id) else log.error "no queued chunks to be dropped for drop_oldest_chunk" end rescue # ignore any errors end raise unless @buffer.storable? retry else raise "BUG: unknown overflow_action '#{@buffer_config.overflow_action}'" end end end |