Class: Fluent::Plugin::ExecInput
- Defined in:
- lib/fluent/plugin/in_exec.rb
Constant Summary
Constants included from Configurable
Configurable::CONFIG_TYPE_REGISTRY
Instance Attribute Summary collapse
-
#parser ⇒ Object
readonly
Returns the value of attribute parser.
Attributes included from Fluent::PluginLoggerMixin
Attributes inherited from Base
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #on_record(time, record) ⇒ Object
- #run(io) ⇒ Object
- #start ⇒ Object
- #validate_encoding(encoding) ⇒ Object
Methods inherited from Input
#emit_records, #emit_size, #initialize, #metric_callback, #statistics
Methods included from Fluent::PluginHelper::Mixin
Methods included from Fluent::PluginLoggerMixin
included, #initialize, #terminate
Methods included from Fluent::PluginId
#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop
Methods inherited from Base
#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #initialize, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?
Methods included from SystemConfig::Mixin
#system_config, #system_config_override
Methods included from Configurable
#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type
Constructor Details
This class inherits a constructor from Fluent::Plugin::Input
Instance Attribute Details
#parser ⇒ Object (readonly)
Returns the value of attribute parser.
51 52 53 |
# File 'lib/fluent/plugin/in_exec.rb', line 51 def parser @parser end |
Instance Method Details
#configure(conf) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/fluent/plugin/in_exec.rb', line 53 def configure(conf) compat_parameters_convert(conf, :extract, :parser) ['parse', 'extract'].each do |subsection_name| if subsection = conf.elements(subsection_name).first if subsection.has_key?('time_format') subsection['time_type'] ||= 'string' end end end super if !@tag && (!@extract_config || !@extract_config.tag_key) raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input" end validate_encoding(@encoding) if @encoding @parser = parser_create end |
#multi_workers_ready? ⇒ Boolean
78 79 80 |
# File 'lib/fluent/plugin/in_exec.rb', line 78 def multi_workers_ready? true end |
#on_record(time, record) ⇒ Object
112 113 114 115 116 117 118 119 120 |
# File 'lib/fluent/plugin/in_exec.rb', line 112 def on_record(time, record) tag = extract_tag_from_record(record) tag ||= @tag time ||= extract_time_from_record(record) || Fluent::EventTime.now router.emit(tag, time, record) rescue => e log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e router.emit_error_event(tag, time, record, e) if tag && time && record end |
#run(io) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/in_exec.rb', line 95 def run(io) case when @parser.implement?(:parse_io) @parser.parse_io(io, &method(:on_record)) when @parser.implement?(:parse_partial_data) until io.eof? @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record)) end when @parser.parser_type == :text_per_line io.each_line do |line| @parser.parse(line.chomp, &method(:on_record)) end else @parser.parse(io.read, &method(:on_record)) end end |
#start ⇒ Object
82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/fluent/plugin/in_exec.rb', line 82 def start super = { mode: [@connect_mode] } [:external_encoding] = @encoding if @encoding if @run_interval child_process_execute(:exec_input, @command, interval: @run_interval, **, &method(:run)) else child_process_execute(:exec_input, @command, immediate: true, **, &method(:run)) end end |
#validate_encoding(encoding) ⇒ Object
72 73 74 75 76 |
# File 'lib/fluent/plugin/in_exec.rb', line 72 def validate_encoding(encoding) Encoding.find(encoding) rescue ArgumentError => e raise Fluent::ConfigError, e. end |