Class: Fluent::Plugin::ExecInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_exec.rb

Constant Summary

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Input

#emit_records, #emit_size, #initialize, #metric_callback, #statistics

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #initialize, #inspect, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Input

Instance Attribute Details

#parserObject (readonly)

Returns the value of attribute parser.



51
52
53
# File 'lib/fluent/plugin/in_exec.rb', line 51

def parser
  @parser
end

Instance Method Details

#configure(conf) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/fluent/plugin/in_exec.rb', line 53

def configure(conf)
  compat_parameters_convert(conf, :extract, :parser)
  ['parse', 'extract'].each do |subsection_name|
    if subsection = conf.elements(subsection_name).first
      if subsection.has_key?('time_format')
        subsection['time_type'] ||= 'string'
      end
    end
  end

  super

  if !@tag && (!@extract_config || !@extract_config.tag_key)
    raise Fluent::ConfigError, "'tag' or 'tag_key' option is required on exec input"
  end
  validate_encoding(@encoding) if @encoding
  @parser = parser_create
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


78
79
80
# File 'lib/fluent/plugin/in_exec.rb', line 78

def multi_workers_ready?
  true
end

#on_record(time, record) ⇒ Object



112
113
114
115
116
117
118
119
120
# File 'lib/fluent/plugin/in_exec.rb', line 112

def on_record(time, record)
  tag = extract_tag_from_record(record)
  tag ||= @tag
  time ||= extract_time_from_record(record) || Fluent::EventTime.now
  router.emit(tag, time, record)
rescue => e
  log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e
  router.emit_error_event(tag, time, record, e) if tag && time && record
end

#run(io) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/in_exec.rb', line 95

def run(io)
  case
  when @parser.implement?(:parse_io)
    @parser.parse_io(io, &method(:on_record))
  when @parser.implement?(:parse_partial_data)
    until io.eof?
      @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record))
    end
  when @parser.parser_type == :text_per_line
    io.each_line do |line|
      @parser.parse(line.chomp, &method(:on_record))
    end
  else
    @parser.parse(io.read, &method(:on_record))
  end
end

#startObject



82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/fluent/plugin/in_exec.rb', line 82

def start
  super

  options = { mode: [@connect_mode] }
  options[:external_encoding] = @encoding if @encoding

  if @run_interval
    child_process_execute(:exec_input, @command, interval: @run_interval, **options, &method(:run))
  else
    child_process_execute(:exec_input, @command, immediate: true, **options, &method(:run))
  end
end

#validate_encoding(encoding) ⇒ Object



72
73
74
75
76
# File 'lib/fluent/plugin/in_exec.rb', line 72

def validate_encoding(encoding)
  Encoding.find(encoding)
rescue ArgumentError => e
  raise Fluent::ConfigError, e.message
end