Class: Fluent::Plugin::DatadogOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_datadog_log.rb

Overview

fluentd output plugin for the Datadog Log Intake API

Defined Under Namespace

Modules: Platform

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'
PLUGIN_NAME =
'Fluentd Datadog plugin'
PLUGIN_VERSION =
'0.1.0'
METADATA_SERVICE_ADDR =

Address of the metadata service.

'169.254.169.254'
TRUNCATED_MSG =

Disable this warning to conform to fluentd config_param conventions. rubocop:disable Style/HashSyntax

'...TRUNCATED...'
TRUNCATED_LEN =
TRUNCATED_MSG.size
DD_MAX_MESSAGE_LEN =

MaxMessageLen is the maximum length for any message we send to the intake see github.com/DataDog/datadog-log-agent/blob/2394da8c79a6cadbcd1e98d6c89c437becec2732/pkg/config/constants.go#L9-L10

1 * 1000 * 1000
MAX_MESSAGE_LEN =
DD_MAX_MESSAGE_LEN - TRUNCATED_LEN

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeDatadogOutput

Returns a new instance of DatadogOutput.



128
129
130
131
132
# File 'lib/fluent/plugin/out_datadog_log.rb', line 128

def initialize
  super
  # use the global logger
  @log = $log # rubocop:disable Style/GlobalVars
end

Instance Attribute Details

#vm_idObject (readonly)

Returns the value of attribute vm_id.



126
127
128
# File 'lib/fluent/plugin/out_datadog_log.rb', line 126

def vm_id
  @vm_id
end

#zoneObject (readonly)

rubocop:enable Style/HashSyntax



125
126
127
# File 'lib/fluent/plugin/out_datadog_log.rb', line 125

def zone
  @zone
end

Instance Method Details

#build_api_key_str(api_key:, logset:) ⇒ Object



167
168
169
170
171
172
173
# File 'lib/fluent/plugin/out_datadog_log.rb', line 167

def build_api_key_str(api_key:, logset:)
  if !logset.nil? && logset != ''
    "#{api_key}/#{logset}"
  else
    api_key
  end
end

#build_extra_content(timestamp:, hostname:, service:, tags_payload:) ⇒ Object



163
164
165
# File 'lib/fluent/plugin/out_datadog_log.rb', line 163

def build_extra_content(timestamp:, hostname:, service:, tags_payload:)
  "<46>0 #{timestamp} #{hostname} #{service} - - #{tags_payload}"
end

#build_payload(api_key_str:, msg:, extra_content:) ⇒ Object

build_payload returns a processed payload from a raw message

Parameters:

  • api_key_str (String)
  • extra_content (String)
  • msg (String)


179
180
181
# File 'lib/fluent/plugin/out_datadog_log.rb', line 179

def build_payload(api_key_str:, msg:, extra_content:)
  "#{api_key_str} #{extra_content} #{msg}\\n"
end

#build_tags_payload(config_tags:, source:, source_category:) ⇒ Object

Given a list of tags, build_tags_payload generates the bytes array that will be inserted into messages



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/fluent/plugin/out_datadog_log.rb', line 145

def build_tags_payload(config_tags:, source:, source_category:)
  payload = ''

  payload = "[dd ddsource=\"#{source}\"]" if !source.nil? && source != ''

  if !source_category.nil? && source_category != ''
    payload = "#{payload}[dd ddsourcecategory=\"#{source_category}\"]"
  end

  if !config_tags.nil? && config_tags != ''
    config_tags = config_tags.join(',') if config_tags.is_a? ::Array
    payload = "#{payload}[dd ddtags=\"#{config_tags}\"]"
  end

  payload
end

#configure(conf) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/fluent/plugin/out_datadog_log.rb', line 183

def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject)
  super

  if @api_key.size == 0
    @api_key = ENV['DD_LOG_API_KEY']
    if @api_key == '' || @api_key.nil?
      error_message = 'Unable to obtain api_key from DD_LOG_API_KEY'
      fail Fluent::ConfigError, error_message
    end
  end

  # If monitoring is enabled, register metrics in the default registry
  # and store metric objects for future use.
  if @enable_monitoring
    registry = Monitoring::MonitoringRegistryFactory.create @monitoring_type
    @successful_requests_count = registry.counter(
      :datadog_successful_requests_count,
      'A number of successful requests to the Datadog Log Intake API')
    @failed_requests_count = registry.counter(
      :datadog_failed_requests_count,
      'A number of failed requests to the Datadog Log Intake API,'\
        ' broken down by the error code')
    @ingested_entries_count = registry.counter(
      :datadog_ingested_entries_count,
      'A number of log entries ingested by Datadog Log Intake')
    @dropped_entries_count = registry.counter(
      :datadog_dropped_entries_count,
      'A number of log entries dropped by the Stackdriver output plugin')
    @retried_entries_count = registry.counter(
      :datadog_retried_entries_count,
      'The number of log entries that failed to be ingested by the'\
        ' Stackdriver output plugin due to a transient error and were'\
        ' retried')
  end

  @platform = detect_platform

  # Set required variables: @project_id, @vm_id, @vm_name and @zone.
  

  @default_tags = build_default_tags

  # The resource and labels are now set up; ensure they can't be modified
  # without first duping them.
  @default_tags.freeze

  # Log an informational message containing the Logs viewer URL
  @log.info 'Logs viewer address: https://example.com/logs/'
end

#format(tag, time, record) ⇒ Object



246
247
248
249
# File 'lib/fluent/plugin/out_datadog_log.rb', line 246

def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, record].to_msgpack
end

#formatted_to_msgpack_binary?Boolean

Returns:

  • (Boolean)


251
252
253
# File 'lib/fluent/plugin/out_datadog_log.rb', line 251

def formatted_to_msgpack_binary?
  true
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


255
256
257
# File 'lib/fluent/plugin/out_datadog_log.rb', line 255

def multi_workers_ready?
  true
end

#shutdownObject



241
242
243
244
# File 'lib/fluent/plugin/out_datadog_log.rb', line 241

def shutdown
  super
  @conn.close unless @conn.nil?
end

#startObject



234
235
236
237
238
239
# File 'lib/fluent/plugin/out_datadog_log.rb', line 234

def start
  super
  init_api_client
  @successful_call = false
  @timenanos_warning = false
end

#truncate_message(msg) ⇒ Object



134
135
136
137
138
139
140
# File 'lib/fluent/plugin/out_datadog_log.rb', line 134

def truncate_message(msg)
  if msg.size > DD_MAX_MESSAGE_LEN
    msg.slice(0, MAX_MESSAGE_LEN) + TRUNCATED_MSG
  else
    msg
  end
end

#write(chunk) ⇒ Object



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
# File 'lib/fluent/plugin/out_datadog_log.rb', line 259

def write(chunk)
  each_valid_record(chunk) do |_tag, time, record|
    if @detect_json
      # Save the timestamp and severity if available, then clear it out to
      # allow for determining whether we should parse the log or message
      # field.
      timestamp = record.delete('time')
      severity = record.delete('severity')

      # If the log is json, we want to export it as a structured log
      # unless there is additional metadata that would be lost.
      record_json = nil
      if record.length == 1
        %w(log message msg).each do |field|
          if record.key?(field)
            record_json = parse_json_or_nil(record[field])
          end
        end
      end
      record = record_json unless record_json.nil?
      # Restore timestamp and severity if necessary. Note that we don't
      # want to override these keys in the JSON we've just parsed.
      record['time'] ||= timestamp if timestamp
      record['severity'] ||= severity if severity
    end

    # TODO: Correlate Datadog APM spans with log messages
    # fq_trace_id = record.delete(@trace_key)
    # entry.trace = fq_trace_id if fq_trace_id

    begin
      msg = nil
      %w(log message msg).each do |field|
        msg = record[field] if record.key?(field)
      end

      tags = []

      kube = record['kubernetes'] || {}

      mappings = {
        'pod_name' => 'pod_name',
        'container_name' => 'container_name',
        'namespace_name' => 'kube_namespace'
      }

      mappings.each do |json_key, tag_key|
        tags << "#{tag_key}=#{kube[json_key]}" if kube.key? json_key
      end

      if kube.key? 'labels'
        labels = kube['labels']
        labels.each do |k, v|
          tags << "kube_#{k}=#{v}"
        end
      end

      # TODO: Include K8S tags like
      # - kube_daemon_set=$daemonset_name
      # - kube_deployment=$deployment_name
      # - kube_replica_set=$replicaset_name
      # -

      tags.concat(@default_tags)

      datetime = Time.at(Fluent::EventTime.new(time).to_r).utc.to_datetime
      timestamp_str = datetime.rfc3339(6)

      payload = build_payload(
        api_key_str: build_api_key_str(api_key: @api_key, logset: @logset),
        msg: truncate_message(msg),
        extra_content: build_extra_content(
          timestamp: timestamp_str,
          hostname: @vm_id,
          service: @service,
          tags_payload: build_tags_payload(
            config_tags: tags,
            source: @source,
            source_category: @source_category
          )
        )
      )

      entries_count = 1
      @conn.write(payload)
      @log.debug 'Sent payload to Datadog.', payload: payload
      increment_successful_requests_count
      increment_ingested_entries_count(entries_count)

      # Let the user explicitly know when the first call succeeded, to aid
      # with verification and troubleshooting.
      unless @successful_call
        @successful_call = true
        @log.info 'Successfully sent to Datadog.'
      end

    rescue => error
      increment_failed_requests_count
      increment_retried_entries_count(entries_count)
      # RPC cancelled, so retry via re-raising the error.
      @log.debug "Retrying #{entries_count} log message(s) later.",
                 error: error.to_s
      raise error
    end
  end
end