Fluentd filter plugin to deduplicate records for InfluxDB

A filter plugin that implements the deduplication techniques described in the InfluxDB doc.

Installation

Using RubyGems:

fluent-gem install fluent-plugin-influxdb-deduplication

Configuration

Deduplicate by incrementing the timestamp

Each data point is assigned a unique timestamp. The filter plugin reads the fluentd record event time with a precision to the second, and stores it in a field with a precision to the nanosecond. Any sequence of record with the same timestamp has a timestamp incremented by 1 nanosecond.

<filter pattern>
  @type influxdb_deduplication

  <time>
    # field to store the deduplicated timestamp
    key my_key_field
  </time>
</filter>

For example, the following input records:

Fluentd Event Time Record
1613910640 { "k1" => 0, "k2" => "value0" }
1613910640 { "k1" => 1, "k2" => "value1" }
1613910640 { "k1" => 2, "k2" => "value2" }
1613910641 { "k1" => 3, "k3" => "value3" }

Would become on output:

Fluentd Event Time Record
1613910640 { "k1" => 0, "k2" => "value0", "my_key_field" => 1613910640000000000 }
1613910640 { "k1" => 1, "k2" => "value1", "my_key_field" => 1613910640000000001 }
1613910640 { "k1" => 2, "k2" => "value2", "my_key_field" => 1613910640000000002 }
1613910641 { "k1" => 3, "k3" => "value3", "my_key_field" => 1613910643000000000 }

The time key field can then be passed as is to the fluent-plugin-influxdb-v2. Example configuration on nginx logs:

<filter nginx.access>
  @type influxdb_deduplication

  <time>
    # field to store the deduplicated timestamp
    key my_key_field
  </time>
</filter>

<match nginx.access>
    @type influxdb2

    # setup the access to your InfluxDB v2 instance
    url             https://localhost:8086
    token           my-token
    bucket          my-bucket
    org             my-org

    # the influxdb2 time_key must be set to the same value as the influxdb_deduplication time.key
    time_key my_key_field

    # the timestamp precision must be set to ns
    time_precision ns

    tag_keys ["request_method", "status"]
    field_keys ["remote_addr", "request_uri"]
</match>

The data can then be queried as a table and viewed in Grafana for example with the flux query:

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> pivot(
    rowKey: ["_time"],
    columnKey: ["_field"],
    valueColumn: "_value"
  )
  |> keep(columns: ["_time", "request_method", "status", "remote_addr", "request_uri"])

Deduplicate by adding a sequence tag

Each record is assigned a sequence number, the output record can be uniquely identified by the pair (fluentd_event_time, sequence_number). The event time is untouched so no precision is lost for time.

<filter pattern>
  @type influxdb_deduplication

  <tag>
    # field to store the deduplicated timestamp
    key my_key_field
  </tag>
</filter>

For example, the following input records:

Fluentd Event Time Record
1613910640 { "k1" => 0, "k2" => "value0" }
1613910640 { "k1" => 1, "k2" => "value1" }
1613910640 { "k1" => 2, "k2" => "value2" }
1613910641 { "k1" => 3, "k3" => "value3" }

Would become on output:

Fluentd Event Time Record
1613910640 { "k1" => 0, "k2" => "value0", "my_key_field" => 0 }
1613910640 { "k1" => 1, "k2" => "value1", "my_key_field" => 1 }
1613910640 { "k1" => 2, "k2" => "value2", "my_key_field" => 2 }
1613910641 { "k1" => 3, "k3" => "value3", "my_key_field" => 0 }

The sequence tag should be passed in the tag parameters of fluent-plugin-influxdb-v2. Example configuration on nginx logs:

<filter nginx.access>
  @type influxdb_deduplication

  <time>
    # field to store the deduplicated timestamp
    key my_key_field
  </time>
</filter>

<match nginx.access>
    @type influxdb2

    # setup the access to your InfluxDB v2 instance
    url             https://localhost:8086
    token           my-token
    bucket          my-bucket
    org             my-org

    # the influxdb2 time_key is not specified so the fluentd event time is used
    # time_key

    # there's no requirements on the time_precision value this time
    # time_precision ns

    # "my_key_field" must be passed to influxdb's tag_keys
    tag_keys ["request_method", "status", "my_key_field"]
    field_keys ["remote_addr", "request_uri"]
</match>

The data can then be queried as a table and viewed in Grafana for example with the flux query:

from(bucket: "my-bucket")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> pivot(
    rowKey: ["_time", "my_key_field"],
    columnKey: ["_field"],
    valueColumn: "_value"
  )
  |> keep(columns: ["_time", "request_method", "status", "remote_addr", "request_uri"])

Detecting out of order records

This filter plugin expects the fluentd event timestamps of the incoming record to increase and never decrease. Optionally, a order key can be added to indicate if the record arrived in order or not. For example with this config

<filter pattern>
  @type influxdb_deduplication

  order_key order_field

  <time>
    # field to store the deduplicated timestamp
    key my_key_field
  </time>
</filter>

Without order key, out of order records are dropped to avoid previous data points being overridden. With a order key, out of order records will still be pushed but with order_field = false. Out of order records are not deduplicated but they will be apparent in influxdb.