Class: Fluent::Plugin::ExecOutput

Inherits:
Output show all
Defined in:
lib/fluent/plugin/out_exec.rb

Constant Summary collapse

NEWLINE =
"\n"

Constants inherited from Output

Output::BUFFER_STATS_KEYS, Output::CHUNKING_FIELD_WARN_NUM, Output::CHUNK_ID_PLACEHOLDER_PATTERN, Output::CHUNK_KEY_PATTERN, Output::CHUNK_KEY_PLACEHOLDER_PATTERN, Output::CHUNK_TAG_PLACEHOLDER_PATTERN, Output::FORMAT_COMPRESSED_MSGPACK_STREAM, Output::FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT, Output::FORMAT_MSGPACK_STREAM, Output::FORMAT_MSGPACK_STREAM_TIME_INT, Output::TIMESTAMP_CHECK_BASE_TIME, Output::TIME_KEY_PLACEHOLDER_THRESHOLDS, Output::UNRECOVERABLE_ERRORS

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Output

#as_secondary, #buffer, #chunk_key_accessors, #chunk_key_tag, #chunk_key_time, #chunk_keys, #delayed_commit, #delayed_commit_timeout, #dequeued_chunks, #dequeued_chunks_mutex, #output_enqueue_thread_waiting, #retry, #retry_for_error_chunk, #secondary, #timekey_zone

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Output

#acts_as_secondary, #actual_flush_thread_count, #after_shutdown, #after_start, #backup_chunk, #before_shutdown, #calculate_timekey, #check_slow_flush, #chunk_for_test, #close, #commit_write, #emit_buffered, #emit_count, #emit_events, #emit_records, #emit_size, #emit_sync, #enqueue_thread_run, #enqueue_thread_wait, #execute_chunking, #extract_placeholders, #flush_thread_run, #flush_thread_wakeup, #force_flush, #formatted_to_msgpack_binary, #formatted_to_msgpack_binary?, #generate_format_proc, #get_placeholders_keys, #get_placeholders_tag, #get_placeholders_time, #handle_limit_reached, #handle_stream_simple, #handle_stream_with_custom_format, #handle_stream_with_standard_format, #implement?, #initialize, #interrupt_flushes, #keep_buffer_config_compat, #log_retry_error, #metadata, #next_flush_time, #num_errors, #placeholder_validate!, #placeholder_validators, #prefer_buffered_processing, #prefer_delayed_commit, #process, #retry_state, #rollback_count, #rollback_write, #shutdown, #start, #statistics, #stop, #submit_flush_all, #submit_flush_once, #support_in_v12_style?, #synchronize_in_threads, #synchronize_path, #synchronize_path_in_workers, #terminate, #try_flush, #try_rollback_all, #try_rollback_write, #update_retry_state, #write, #write_count, #write_guard

Methods included from UniqueId::Mixin

#dump_unique_id_hex, #generate_unique_id

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #initialize, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Output

Instance Attribute Details

#formatterObject (readonly)

for tests



46
47
48
# File 'lib/fluent/plugin/out_exec.rb', line 46

def formatter
  @formatter
end

Instance Method Details

#configure(conf) ⇒ Object



48
49
50
51
52
# File 'lib/fluent/plugin/out_exec.rb', line 48

def configure(conf)
  compat_parameters_convert(conf, :inject, :formatter, :buffer, default_chunk_key: 'time')
  super
  @formatter = formatter_create
end

#format(tag, time, record) ⇒ Object



60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/out_exec.rb', line 60

def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  if @formatter.formatter_type == :text_per_line
    @formatter.format(tag, time, record).chomp + NEWLINE
  else
    @formatter.format(tag, time, record)
  end
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


54
55
56
# File 'lib/fluent/plugin/out_exec.rb', line 54

def multi_workers_ready?
  true
end

#try_write(chunk) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/fluent/plugin/out_exec.rb', line 69

def try_write(chunk)
  tmpfile = nil
  prog = if chunk.respond_to?(:path)
           "#{@command} #{chunk.path}"
         else
           tmpfile = Tempfile.new("fluent-plugin-out-exec-")
           tmpfile.binmode
           chunk.write_to(tmpfile)
           tmpfile.close
           "#{@command} #{tmpfile.path}"
         end
  chunk_id = chunk.unique_id
  callback = ->(status){
    begin
      if tmpfile
        tmpfile.delete rescue nil
      end
      if status && status.success?
        commit_write(chunk_id)
      elsif status
        # #rollback_write will be done automatically if it isn't called at here.
        # But it's after command_timeout, and this timeout should be longer than users expectation.
        # So here, this plugin calls it explicitly.
        rollback_write(chunk_id)
        log.warn "command exits with error code", prog: prog, status: status.exitstatus, signal: status.termsig
      else
        rollback_write(chunk_id)
        log.warn "command unexpectedly exits without exit status", prog: prog
      end
    rescue => e
      log.error "unexpected error in child process callback", error: e
    end
  }
  child_process_execute(:out_exec_process, prog, stderr: :connect, immediate: true, parallel: true, mode: [], wait_timeout: @command_timeout, on_exit_callback: callback)
end