Class: Fluent::Plugin::ExecOutput
- Defined in:
- lib/fluent/plugin/out_exec.rb
Constant Summary collapse
- NEWLINE =
"\n"
Constants inherited from Output
Output::BUFFER_STATS_KEYS, Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_ID_PLACEHOLDER_PATTERN, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Output::UNRECOVERABLE_ERRORS
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#formatter ⇒ Object
readonly
for tests.
Attributes inherited from Output
#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #secondary, #timekey_zone
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #try_write(chunk) ⇒ Object
Methods inherited from Output
#acts_as_secondary, #actual_flush_thread_count, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_count, #emit_events, #emit_records, #emit_size, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_limit_reached, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #initialize, #interrupt_flushes, #keep_buffer_config_compat, #log_retry_error, #metadata, #next_flush_time, #num_errors, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_count, #rollback_write, #shutdown, #start, #statistics, #stop, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #synchronize_in_threads, #synchronize_path, #synchronize_path_in_workers, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #update_retry_state, #write, #write_count, #write_guard
Methods included from UniqueId::Mixin
#dump_unique_id_hex, #generate_unique_id
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
included, #initialize, #terminate
Methods included from Fluent::PluginId
#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #initialize, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type
Constructor Details
This class inherits a constructor from Fluent::Plugin::Output
Instance Attribute Details
#formatter ⇒ Object (readonly)
for tests
46 47 48 |
# File 'lib/fluent/plugin/out_exec.rb', line 46 def formatter @formatter end |
Instance Method Details
#configure(conf) ⇒ Object
48 49 50 51 52 |
# File 'lib/fluent/plugin/out_exec.rb', line 48 def configure(conf) compat_parameters_convert(conf, :inject, :formatter, :buffer, default_chunk_key: 'time') super @formatter = formatter_create end |
#format(tag, time, record) ⇒ Object
60 61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/out_exec.rb', line 60 def format(tag, time, record) record = inject_values_to_record(tag, time, record) if @formatter.formatter_type == :text_per_line @formatter.format(tag, time, record).chomp + NEWLINE else @formatter.format(tag, time, record) end end |
#multi_workers_ready? ⇒ Boolean
54 55 56 |
# File 'lib/fluent/plugin/out_exec.rb', line 54 def multi_workers_ready? true end |
#try_write(chunk) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/fluent/plugin/out_exec.rb', line 69 def try_write(chunk) tmpfile = nil prog = if chunk.respond_to?(:path) "#{@command} #{chunk.path}" else tmpfile = Tempfile.new("fluent-plugin-out-exec-") tmpfile.binmode chunk.write_to(tmpfile) tmpfile.close "#{@command} #{tmpfile.path}" end chunk_id = chunk.unique_id callback = ->(status){ begin if tmpfile tmpfile.delete rescue nil end if status && status.success? commit_write(chunk_id) elsif status # #rollback_write will be done automatically if it isn't called at here. # But it's after command_timeout, and this timeout should be longer than users expectation. # So here, this plugin calls it explicitly. rollback_write(chunk_id) log.warn "command exits with error code", prog: prog, status: status.exitstatus, signal: status.termsig else rollback_write(chunk_id) log.warn "command unexpectedly exits without exit status", prog: prog end rescue => e log.error "unexpected error in child process callback", error: e end } child_process_execute(:out_exec_process, prog, stderr: :connect, immediate: true, parallel: true, mode: [], wait_timeout: @command_timeout, on_exit_callback: callback) end |