Class: InfluxDBOutput
- Inherits:
-
Fluent::Plugin::Output
- Object
- Fluent::Plugin::Output
- InfluxDBOutput
- Defined in:
- lib/fluent/plugin/out_influxdb2.rb
Overview
A buffered output plugin for Fluentd and InfluxDB 2
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
'memory'.freeze
Instance Method Summary collapse
- #configure(conf) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/fluent/plugin/out_influxdb2.rb', line 86 def configure(conf) compat_parameters_convert(conf, :inject) super case @time_precision when 'ns' then @precision_formatter = ->(ns_time) { ns_time } when 'us' then @precision_formatter = ->(ns_time) { (ns_time / 1e3).round } when 'ms' then @precision_formatter = ->(ns_time) { (ns_time / 1e6).round } when 's' then @precision_formatter = ->(ns_time) { (ns_time / 1e9).round } else raise Fluent::ConfigError, "The time precision #{@time_precision} is not supported. You should use: " \ 'second (s), millisecond (ms), microsecond (us), or nanosecond (ns).' end @precision = InfluxDB2::WritePrecision.new.get_from_value(@time_precision) raise Fluent::ConfigError, 'The InfluxDB URL should be defined.' if @url.empty? end |
#multi_workers_ready? ⇒ Boolean
120 121 122 |
# File 'lib/fluent/plugin/out_influxdb2.rb', line 120 def multi_workers_ready? true end |
#shutdown ⇒ Object
115 116 117 118 |
# File 'lib/fluent/plugin/out_influxdb2.rb', line 115 def shutdown super @client.close! end |
#start ⇒ Object
106 107 108 109 110 111 112 113 |
# File 'lib/fluent/plugin/out_influxdb2.rb', line 106 def start super log.info "Connecting to InfluxDB: url: #{@url}, bucket: #{@bucket}, org: #{@org}, precision = #{@precision}, " \ "use_ssl = #{@use_ssl}, verify_mode = #{@verify_mode}" @client = InfluxDB2::Client.new(@url, @token, bucket: @bucket, org: @org, precision: @precision, use_ssl: @use_ssl, verify_mode: @verify_mode) @write_api = @client.create_write_api end |
#write(chunk) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/fluent/plugin/out_influxdb2.rb', line 124 def write(chunk) points = [] tag = chunk..tag bucket, measurement = (chunk) chunk.msgpack_each do |time, record| if @line_protocol_key points << record[@line_protocol_key] if record.include?(@line_protocol_key) else time_formatted = _format_time(time) point = InfluxDB2::Point .new(name: measurement) record.each_pair do |k, v| if k.eql?(@time_key) time_formatted = _format_time(v) else _parse_field(k, v, point) end point.add_tag('fluentd', tag) if @tag_fluentd end point.time(time_formatted, @precision) points << point end end if points.empty? log.debug "Nothing to write for chunk: #{chunk.}" else @write_api.write(data: points, bucket: bucket) log.debug "Written points: #{points}" end end |