Class: InfluxDBOutput

Inherits:
Fluent::Plugin::Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_influxdb2.rb

Overview

A buffered output plugin for Fluentd and InfluxDB 2

Constant Summary collapse

DEFAULT_BUFFER_TYPE =
'memory'.freeze

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object

Raises:

  • (Fluent::ConfigError)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/out_influxdb2.rb', line 86

def configure(conf)
  compat_parameters_convert(conf, :inject)
  super
  case @time_precision
  when 'ns' then
    @precision_formatter = ->(ns_time) { ns_time }
  when 'us' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e3).round }
  when 'ms' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e6).round }
  when 's' then
    @precision_formatter = ->(ns_time) { (ns_time / 1e9).round }
  else
    raise Fluent::ConfigError, "The time precision #{@time_precision} is not supported. You should use: " \
                               'second (s), millisecond (ms), microsecond (us), or nanosecond (ns).'
  end
  @precision = InfluxDB2::WritePrecision.new.get_from_value(@time_precision)
  raise Fluent::ConfigError, 'The InfluxDB URL should be defined.' if @url.empty?
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


120
121
122
# File 'lib/fluent/plugin/out_influxdb2.rb', line 120

def multi_workers_ready?
  true
end

#shutdownObject



115
116
117
118
# File 'lib/fluent/plugin/out_influxdb2.rb', line 115

def shutdown
  super
  @client.close!
end

#startObject



106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_influxdb2.rb', line 106

def start
  super
  log.info  "Connecting to InfluxDB: url: #{@url}, bucket: #{@bucket}, org: #{@org}, precision = #{@precision}, " \
            "use_ssl = #{@use_ssl}, verify_mode = #{@verify_mode}"
  @client = InfluxDB2::Client.new(@url, @token, bucket: @bucket, org: @org, precision: @precision, use_ssl: @use_ssl,
                                                verify_mode: @verify_mode)
  @write_api = @client.create_write_api
end

#write(chunk) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/plugin/out_influxdb2.rb', line 124

def write(chunk)
  points = []
  tag = chunk..tag
  bucket, measurement = expand_placeholders(chunk)
  chunk.msgpack_each do |time, record|
    if @line_protocol_key
      points << record[@line_protocol_key] if record.include?(@line_protocol_key)
    else
      time_formatted = _format_time(time)
      point = InfluxDB2::Point
              .new(name: measurement)
      record.each_pair do |k, v|
        if k.eql?(@time_key)
          time_formatted = _format_time(v)
        else
          _parse_field(k, v, point)
        end
        point.add_tag('fluentd', tag) if @tag_fluentd
      end
      point.time(time_formatted, @precision)
      points << point
    end
  end
  if points.empty?
    log.debug "Nothing to write for chunk: #{chunk.}"
  else
    @write_api.write(data: points, bucket: bucket)
    log.debug "Written points: #{points}"
  end
end