Class: Fluent::Plugin::ParserFilter

Inherits:
Filter show all
Defined in:
lib/fluent/plugin/filter_parser.rb

Constant Summary collapse

FAILED_RESULT =

reduce allocation cost

[nil, nil].freeze
REPLACE_CHAR =
'?'.freeze

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Filter

#has_filter_with_time

Attributes included from Fluent::PluginLoggerMixin

#log

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods inherited from Filter

#emit_records, #emit_size, #filter, #filter_stream, #initialize, #measure_metrics, #statistics

Methods included from Fluent::PluginHelper::Mixin

included

Methods included from Fluent::PluginLoggerMixin

included, #initialize, #terminate

Methods included from Fluent::PluginId

#initialize, #plugin_id, #plugin_id_configured?, #plugin_id_for_test?, #plugin_root_dir, #stop

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #initialize, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #start, #started?, #stop, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, #initialize, lookup_type, register_type

Constructor Details

This class inherits a constructor from Fluent::Plugin::Filter

Instance Attribute Details

#parserObject (readonly)

Returns the value of attribute parser.



46
47
48
# File 'lib/fluent/plugin/filter_parser.rb', line 46

def parser
  @parser
end

Instance Method Details

#configure(conf) ⇒ Object



48
49
50
51
52
53
54
55
# File 'lib/fluent/plugin/filter_parser.rb', line 48

def configure(conf)
  compat_parameters_convert(conf, :parser)

  super

  @accessor = record_accessor_create(@key_name)
  @parser = parser_create
end

#filter_with_time(tag, time, record) ⇒ Object



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/filter_parser.rb', line 60

def filter_with_time(tag, time, record)
  raw_value = @accessor.call(record)
  if raw_value.nil?
    if @emit_invalid_record_to_error
      router.emit_error_event(tag, time, record, ArgumentError.new("#{@key_name} does not exist"))
    end
    if @reserve_data
      return time, handle_parsed(tag, record, time, {})
    else
      return FAILED_RESULT
    end
  end
  begin
    # Note: https://github.com/fluent/fluentd/issues/4100
    # If the parser returns multiple records from one raw_value,
    # this returns only the first one record.
    # This should be fixed in the future version.
    result_time = nil
    result_record = nil

    @parser.parse(raw_value) do |t, values|
      if values
        t = if @reserve_time
              time
            else
              t.nil? ? time : t
            end
        @accessor.delete(record) if @remove_key_name_field
        r = handle_parsed(tag, record, t, values)

        if result_record.nil?
          result_time = t
          result_record = r
        else
          if @emit_invalid_record_to_error
            router.emit_error_event(tag, t, r, Fluent::Plugin::Parser::ParserError.new(
              "Could not emit the event. The parser returned multiple results, but currently filter_parser plugin only returns the first parsed result. Raw data: '#{raw_value}'"
            ))
          end
        end
      else
        if @emit_invalid_record_to_error
          router.emit_error_event(tag, time, record, Fluent::Plugin::Parser::ParserError.new("pattern not matched with data '#{raw_value}'"))
        end

        next unless @reserve_data
        next unless result_record.nil?

        result_time = time
        result_record = handle_parsed(tag, record, time, {})
      end
    end

    return result_time, result_record
  rescue Fluent::Plugin::Parser::ParserError => e
    if @emit_invalid_record_to_error
      raise e
    else
      return FAILED_RESULT
    end
  rescue ArgumentError => e
    raise unless @replace_invalid_sequence
    raise unless e.message.index("invalid byte sequence in") == 0

    raw_value = raw_value.scrub(REPLACE_CHAR)
    retry
  rescue => e
    if @emit_invalid_record_to_error
      raise Fluent::Plugin::Parser::ParserError, "parse failed #{e.message}"
    else
      return FAILED_RESULT
    end
  end
end