Class: Fluent::Plugin::CloudwatchIngestParser

Inherits:
RegexpParser
  • Object
show all
Defined in:
lib/fluent/plugin/parser_cloudwatch_ingest.rb

Instance Method Summary collapse

Constructor Details

#initializeCloudwatchIngestParser

Returns a new instance of CloudwatchIngestParser.



24
25
26
# File 'lib/fluent/plugin/parser_cloudwatch_ingest.rb', line 24

def initialize
  super
end

Instance Method Details

#configure(conf) ⇒ Object



28
29
30
31
# File 'lib/fluent/plugin/parser_cloudwatch_ingest.rb', line 28

def configure(conf)
  super
  @statsd = Statsd.new @statsd_endpoint, 8125 if @telemetry
end

#metric(method, name, value = 0) ⇒ Object



33
34
35
36
37
38
39
40
# File 'lib/fluent/plugin/parser_cloudwatch_ingest.rb', line 33

def metric(method, name, value = 0)
  case method
  when :increment
    @statsd.send(method, name) if @telemetry
  else
    @statsd.send(method, name, value) if @telemetry
  end
end

#parse(event, group, stream) {|time, record| ... } ⇒ Object

Yields:

  • (time, record)


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/parser_cloudwatch_ingest.rb', line 42

def parse(event, group, stream)
  metric(:increment, 'parser.record.attempted')

  time = nil
  record = nil
  super(event.message) do |t, r|
    time = t
    record = r
  end

  # Optionally attempt to parse the body as json
  if @parse_json_body
    begin
      # Whilst we could just merge! the parsed
      # message into the record we'd bork on
      # nested keys. Force level one Strings.
      json_body = MultiJson.load(record['message'])
      metric(:increment, 'parser.json.success')
      json_body.each_pair do |k, v|
        record[k.to_s] = v.to_s
      end
    rescue MultiJson::ParseError
      metric(:increment, 'parser.json.failed')
      if @fail_on_unparsable_json
        yield nil, nil
        return
      end
    end
  end

  # Inject optional fields
  record['log_group_name'] = group if @inject_group_name
  record['log_stream_name'] = stream if @inject_stream_name

  if @inject_plugin_ingestion_time
    now = DateTime.now
    record[@inject_plugin_ingestion_time] = now.iso8601
  end

  if @inject_cloudwatch_ingestion_time
    epoch_ms = event.ingestion_time.to_f / 1000
    time = Time.at(epoch_ms)
    record[@inject_cloudwatch_ingestion_time] =
      time.to_datetime.iso8601(3)
  end

  # Optionally emit cloudwatch event and ingestion time skew telemetry
  if @telemetry
    metric(
      :gauge,
      'parser.ingestion_skew',
      event.ingestion_time - event.timestamp
    )
  end

  # Optionally emit @timestamp and plugin ingestion time skew
  if @telemetry
    metric(
      :gauge,
      'parser.plugin_skew',
      now.strftime('%Q').to_i - event.timestamp
    )
  end

  # We do String processing on the event time here to
  # avoid rounding errors introduced by floating point
  # arithmetic.
  event_s  = event.timestamp.to_s[0..9].to_i
  event_ns = event.timestamp.to_s[10..-1].to_i * 1_000_000

  time = Fluent::EventTime.new(event_s, event_ns) if @event_time

  metric(:increment, 'parser.record.success')
  yield time, record
end