Class: Fluent::Plugin::NewrelicOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_newrelic.rb

Defined Under Namespace

Classes: ConnectionFailure

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'.freeze

Instance Method Summary collapse

Instance Method Details

#attempt_send(payload, attempt) ⇒ Object



116
117
118
119
# File 'lib/fluent/plugin/out_newrelic.rb', line 116

def attempt_send(payload, attempt)
  sleep sleep_duration(attempt)
  attempt_send(payload, attempt + 1) unless was_successful?(send(payload)) if should_retry?(attempt)
end

#configure(conf) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/fluent/plugin/out_newrelic.rb', line 47

def configure(conf)
  super

  # create initial sockets hash and socket based on config param
  @end_point = URI.parse(@base_uri)
  @header = {
      'X-Insert-Key' => @api_key,
      'X-Event-Source' => 'logs',
      'Content-Encoding' => 'gzip'
  }.freeze
end

#maybe_parse_json(message) ⇒ Object



138
139
140
141
142
143
144
145
146
147
# File 'lib/fluent/plugin/out_newrelic.rb', line 138

def maybe_parse_json(message)
  begin
    parsed = JSON.parse(message)
    if Hash === parsed
      return parsed
    end
  rescue JSON::ParserError
  end
  return {}
end

#maybe_parse_message_json(record) ⇒ Object



130
131
132
133
134
135
136
# File 'lib/fluent/plugin/out_newrelic.rb', line 130

def maybe_parse_message_json(record)
  if record.has_key?('message')
    message = record['message']
    record = record.merge(maybe_parse_json(message))
  end
  record
end

#package_record(record, timestamp) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/out_newrelic.rb', line 59

def package_record(record, timestamp)
  packaged = {
    'timestamp' => timestamp,
    'attributes' => {}
  }

  if record.has_key?('message')
    message = record['message']
    packaged['attributes'] = packaged['attributes'].merge(maybe_parse_json(message))
  end

  record.each do |key, value|
    if key == 'message'
      packaged['message'] = record['message']
    else
      packaged['attributes'][key] = record[key]
    end
  end
  packaged
end

#send(payload) ⇒ Object



121
122
123
124
125
126
127
128
# File 'lib/fluent/plugin/out_newrelic.rb', line 121

def send(payload)
  http = Net::HTTP.new(@end_point.host, 443)
  http.use_ssl = true
  http.verify_mode = OpenSSL::SSL::VERIFY_PEER
  request = Net::HTTP::Post.new(@end_point.request_uri, @header)
  request.body = payload
  http.request(request)
end

#should_retry?(attempt) ⇒ Boolean

Returns:

  • (Boolean)


104
105
106
# File 'lib/fluent/plugin/out_newrelic.rb', line 104

def should_retry?(attempt)
  attempt < @retries
end

#sleep_duration(attempt) ⇒ Object



112
113
114
# File 'lib/fluent/plugin/out_newrelic.rb', line 112

def sleep_duration(attempt)
  [@max_delay, (2 ** attempt) * @retry_seconds].min
end

#was_successful?(response) ⇒ Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/fluent/plugin/out_newrelic.rb', line 108

def was_successful?(response)
  200 <= response.code.to_i && response.code.to_i < 300
end

#write(chunk) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/out_newrelic.rb', line 80

def write(chunk)
  payload = {
    'logs' => [],
    'common' => {
      'attributes' => {
        'plugin' => {
          'type' => 'fluentd',
          'version' => NewrelicFluentdOutput::VERSION,
        }
      }
    }
  }
  chunk.msgpack_each do |ts, record|
    next unless record.is_a? Hash
    next if record.empty?
    payload['logs'].push(package_record(record, ts))
  end
  io = StringIO.new
  gzip = Zlib::GzipWriter.new(io)
  gzip << [payload].to_json
  gzip.close
  attempt_send(io.string, 0)
end