Class: Fluent::Plugin::Parser

Inherits:
Base
  • Object
show all
Includes:
OwnedByMixin, TimeMixin::Parser
Defined in:
lib/fluent/plugin/parser.rb

Defined Under Namespace

Classes: ParserError, TimeoutChecker

Constant Summary collapse

AVAILABLE_PARSER_VALUE_TYPES =
['string', 'integer', 'float', 'bool', 'time', 'array']
PARSER_TYPES =
[:text_per_line, :text, :binary]
TRUTHY_VALUES =
['true', 'yes', '1']

Constants included from Configurable

Configurable::CONFIG_TYPE_REGISTRY

Instance Attribute Summary collapse

Attributes inherited from Base

#under_plugin_development

Instance Method Summary collapse

Methods included from TimeMixin::Parser

included, #time_parser_create

Methods included from OwnedByMixin

#log, #owner, #owner=

Methods inherited from Base

#acquire_worker_lock, #after_shutdown, #after_shutdown?, #after_start, #after_started?, #before_shutdown, #before_shutdown?, #called_in_test?, #close, #closed?, #configured?, #context_router, #context_router=, #fluentd_worker_id, #get_lock_path, #has_router?, #inspect, #multi_workers_ready?, #plugin_root_dir, #reloadable_plugin?, #shutdown, #shutdown?, #started?, #stopped?, #string_safe_encoding, #terminate, #terminated?

Methods included from SystemConfig::Mixin

#system_config, #system_config_override

Methods included from Configurable

#config, #configure_proxy_generate, #configured_section_create, included, lookup_type, register_type

Constructor Details

#initializeParser

Returns a new instance of Parser.



108
109
110
111
112
# File 'lib/fluent/plugin/parser.rb', line 108

def initialize
  super

  @timeout_checker = nil
end

Instance Attribute Details

#type_convertersObject (readonly)

for tests



101
102
103
# File 'lib/fluent/plugin/parser.rb', line 101

def type_converters
  @type_converters
end

Instance Method Details

#build_type_converters(types) ⇒ Object



228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/fluent/plugin/parser.rb', line 228

def build_type_converters(types)
  return nil unless types

  converters = {}

  types.each_pair do |field_name, type_definition|
    type, option = type_definition.split(":", 2)
    unless AVAILABLE_PARSER_VALUE_TYPES.include?(type)
      raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'"
    end

    conv = case type
           when 'string' then ->(v){ v.to_s }
           when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i }
           when 'float' then ->(v){ v.to_f rescue v.to_s.to_f }
           when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) }
           when 'time'
             # comma-separated: time:[timezone:]time_format
             # time_format is unixtime/float/string-time-format
             timep = if option
                       time_type = 'string' # estimate
                       timezone, time_format = option.split(':', 2)
                       unless Fluent::Timezone.validate(timezone)
                         timezone, time_format = nil, option
                       end
                       if Fluent::TimeMixin::TIME_TYPES.include?(time_format)
                         time_type, time_format = time_format, nil # unixtime/float
                       end
                       time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone)
                     else
                       time_parser_create(type: :string, format: nil, timezone: nil)
                     end
             ->(v){ timep.parse(v) rescue nil }
           when 'array'
             delimiter = option ? option.to_s : ','
             ->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } }
           else
             raise "BUG: unknown type even after check: #{type}"
           end
    converters[field_name] = conv
  end

  converters
end

#call(*a, &b) ⇒ Object



157
158
159
160
161
# File 'lib/fluent/plugin/parser.rb', line 157

def call(*a, &b)
  # Keep backward compatibility for existing plugins
  # TODO: warn when deprecated
  parse(*a, &b)
end

#configure(conf) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/fluent/plugin/parser.rb', line 114

def configure(conf)
  super

  @time_parser = time_parser_create
  @type_converters = build_type_converters(@types)
  @execute_convert_values = @type_converters || @null_value_pattern || @null_empty_string
  @timeout_checker = if @timeout
                       class << self
                         alias_method :parse_orig, :parse
                         alias_method :parse, :parse_with_timeout
                       end
                       TimeoutChecker.new(@timeout)
                     else
                       nil
                     end
end

#convert_values(time, record) ⇒ Object

def parse(text, &block)

time, record = convert_values(time, record)
yield time, record

end



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/fluent/plugin/parser.rb', line 202

def convert_values(time, record)
  return time, record unless @execute_convert_values

  record.each_key do |key|
    value = record[key]
    next unless value # nil/null value is always left as-is.

    if value.is_a?(String) && string_like_null(value)
      record[key] = nil
      next
    end

    if @type_converters && @type_converters.has_key?(key)
      record[key] = @type_converters[key].call(value)
    end
  end

  return time, record
end

#implement?(feature) ⇒ Boolean

Returns:

  • (Boolean)


163
164
165
166
167
168
169
170
171
# File 'lib/fluent/plugin/parser.rb', line 163

def implement?(feature)
  methods_of_plugin = self.class.instance_methods(false)
  case feature
  when :parse_io then methods_of_plugin.include?(:parse_io)
  when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data)
  else
    raise ArgumentError, "Unknown feature for parser plugin: #{feature}"
  end
end

#parse(text, &block) ⇒ Object

Raises:

  • (NotImplementedError)


143
144
145
# File 'lib/fluent/plugin/parser.rb', line 143

def parse(text, &block)
  raise NotImplementedError, "Implement this method in child class"
end

#parse_io(io, &block) ⇒ Object

Raises:

  • (NotImplementedError)


173
174
175
# File 'lib/fluent/plugin/parser.rb', line 173

def parse_io(io, &block)
  raise NotImplementedError, "Optional API #parse_io is not implemented"
end

#parse_partial_data(data, &block) ⇒ Object

Raises:

  • (NotImplementedError)


177
178
179
# File 'lib/fluent/plugin/parser.rb', line 177

def parse_partial_data(data, &block)
  raise NotImplementedError, "Optional API #parse_partial_data is not implemented"
end

#parse_time(record) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/fluent/plugin/parser.rb', line 181

def parse_time(record)
  if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key)
    src = if @keep_time_key
            record[@time_key]
          else
            record.delete(@time_key)
          end
    @time_parser.parse(src)
  elsif @estimate_current_event
    Fluent::EventTime.now
  else
    nil
  end
rescue Fluent::TimeParser::TimeParseError => e
  raise ParserError, e.message
end

#parse_with_timeout(text, &block) ⇒ Object



147
148
149
150
151
152
153
154
155
# File 'lib/fluent/plugin/parser.rb', line 147

def parse_with_timeout(text, &block)
  @timeout_checker.execute {
    parse_orig(text, &block)
  }
rescue UncatchableError
  log.warn "parsing timed out with #{self.class}: text = #{text}"
  # Return nil instead of raising error. in_tail or other plugin can emit broken line.
  yield nil, nil
end

#parser_typeObject



104
105
106
# File 'lib/fluent/plugin/parser.rb', line 104

def parser_type
  :text_per_line
end

#startObject



131
132
133
134
135
# File 'lib/fluent/plugin/parser.rb', line 131

def start
  super

  @timeout_checker.start if @timeout_checker
end

#stopObject



137
138
139
140
141
# File 'lib/fluent/plugin/parser.rb', line 137

def stop
  super

  @timeout_checker.stop if @timeout_checker
end

#string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_pattern) ⇒ Object



222
223
224
# File 'lib/fluent/plugin/parser.rb', line 222

def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_pattern)
  null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) }
end