Module: Fluent::WireProtocolCompatInput::JSONParserMethods

Defined in:
lib/fluent/plugin/in_wire_protocol_compat.rb

Instance Method Summary collapse

Instance Method Details

#normalize_hash(record) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/fluent/plugin/in_wire_protocol_compat.rb', line 44

def normalize_hash(record)
  value = @keep_time_key ? record[@time_key] : record.delete(@time_key)
  if value
    if @time_format
      time = @mutex.synchronize { @time_parser.parse(value) }
    else
      begin
        time = value.to_i
      rescue => e
        raise ParserError, "invalid time value: value = #{value}, error_class = #{e.class.name}, error = #{e.message}"
      end
    end
  else
    if @estimate_current_event
      time = Engine.now
    else
      time = nil
    end
  end
  [ time, record ]
end

#parse(text) {|normalized| ... } ⇒ Object

Yields:

  • (normalized)


66
67
68
69
70
71
72
73
74
75
# File 'lib/fluent/plugin/in_wire_protocol_compat.rb', line 66

def parse(text)
  parsed = @load_proc.call(text)
  case parsed
  when Hash
    normalized = normalize_hash(parsed)
  when Array
    normalized = parsed
  end
  yield *normalized
end