Class: Fluent::Plugin::NewrelicOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::NewrelicOutput
show all
- Defined in:
- lib/fluent/plugin/out_newrelic.rb
Defined Under Namespace
Classes: ConnectionFailure
Constant Summary
collapse
- DEFAULT_BUFFER_TYPE =
'memory'.freeze
Instance Method Summary
collapse
Instance Method Details
#attempt_send(payload, attempt) ⇒ Object
116
117
118
119
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 116
def attempt_send(payload, attempt)
sleep sleep_duration(attempt)
attempt_send(payload, attempt + 1) unless was_successful?(send(payload)) if should_retry?(attempt)
end
|
47
48
49
50
51
52
53
54
55
56
57
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 47
def configure(conf)
super
@end_point = URI.parse(@base_uri)
@header = {
'X-Insert-Key' => @api_key,
'X-Event-Source' => 'logs',
'Content-Encoding' => 'gzip'
}.freeze
end
|
#maybe_parse_json(message) ⇒ Object
138
139
140
141
142
143
144
145
146
147
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 138
def maybe_parse_json(message)
begin
parsed = JSON.parse(message)
if Hash === parsed
return parsed
end
rescue JSON::ParserError
end
return {}
end
|
#maybe_parse_message_json(record) ⇒ Object
130
131
132
133
134
135
136
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 130
def maybe_parse_message_json(record)
if record.has_key?('message')
message = record['message']
record = record.merge(maybe_parse_json(message))
end
record
end
|
#package_record(record, timestamp) ⇒ Object
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 59
def package_record(record, timestamp)
packaged = {
'timestamp' => timestamp,
'attributes' => {}
}
if record.has_key?('message')
message = record['message']
packaged['attributes'] = packaged['attributes'].merge(maybe_parse_json(message))
end
record.each do |key, value|
if key == 'message'
packaged['message'] = record['message']
else
packaged['attributes'][key] = record[key]
end
end
packaged
end
|
#send(payload) ⇒ Object
121
122
123
124
125
126
127
128
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 121
def send(payload)
http = Net::HTTP.new(@end_point.host, 443)
http.use_ssl = true
http.verify_mode = OpenSSL::SSL::VERIFY_PEER
request = Net::HTTP::Post.new(@end_point.request_uri, @header)
request.body = payload
http.request(request)
end
|
#should_retry?(attempt) ⇒ Boolean
104
105
106
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 104
def should_retry?(attempt)
attempt < @retries
end
|
#sleep_duration(attempt) ⇒ Object
112
113
114
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 112
def sleep_duration(attempt)
[@max_delay, (2 ** attempt) * @retry_seconds].min
end
|
#was_successful?(response) ⇒ Boolean
108
109
110
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 108
def was_successful?(response)
200 <= response.code.to_i && response.code.to_i < 300
end
|
#write(chunk) ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/fluent/plugin/out_newrelic.rb', line 80
def write(chunk)
payload = {
'logs' => [],
'common' => {
'attributes' => {
'plugin' => {
'type' => 'fluentd',
'version' => NewrelicFluentdOutput::VERSION,
}
}
}
}
chunk.msgpack_each do |ts, record|
next unless record.is_a? Hash
next if record.empty?
payload['logs'].push(package_record(record, ts))
end
io = StringIO.new
gzip = Zlib::GzipWriter.new(io)
gzip << [payload].to_json
gzip.close
attempt_send(io.string, 0)
end
|