Class: Fluent::Plugin::TimestreamOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_timestream.rb

Overview

rubocop: disable Metrics/ClassLength Fluent plugin for Amazon Timestream

Defined Under Namespace

Classes: EmptyValueError, NoDimensionsError

Constant Summary collapse

VALID_TIME_UNIT =
%w[
  SECONDS
  MILLISECONDS
  MICROSECONDS
  NANOSECONDS
].freeze
DUMMY_MEASURE =
{ name: '-', value: '-', type: 'VARCHAR' }.freeze

Instance Method Summary collapse

Instance Method Details

#build_measure_payload(measures) ⇒ Object



198
199
200
201
202
203
204
# File 'lib/fluent/plugin/out_timestream.rb', line 198

def build_measure_payload(measures)
  if multi_measure? && !measures.empty?
    multi_measure_payload(measures)
  else
    single_measure_payload(measures)
  end
end

#configure(conf) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/fluent/plugin/out_timestream.rb', line 62

def configure(conf)
  super
  options = credential_options
  options[:region] = @region if @region
  options[:endpoint] = @endpoint if @endpoint
  options[:ssl_verify_peer] = @ssl_verify_peer
  @client = Aws::TimestreamWrite::Client.new(options)

  @database = ENV['AWS_TIMESTREAM_DATABASE'] if @database.nil?
  @table = ENV['AWS_TIMESTREAM_TABLE'] if @table.nil?
  validate_time_unit
end

#create_timestream_dimension(key, value) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/out_timestream.rb', line 109

def create_timestream_dimension(key, value)
  value = value.to_s

  # Timestream does not accept empty string.
  # Ignore this dimension.
  return nil if value.empty?

  {
    dimension_value_type: 'VARCHAR',
    name: key,
    value: value
  }
end

#create_timestream_dimensions_and_measures(record) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/fluent/plugin/out_timestream.rb', line 137

def create_timestream_dimensions_and_measures(record)
  record.each_with_object([[], []]) do |(key, value), (dimensions, measures)|
    measure_type = measure_types[key]
    if measure_type
      measure = create_timestream_measure(key, value, measure_type)
      measures << measure if measure
    else
      dimension = create_timestream_dimension(key, value)
      dimensions << dimension if dimension
    end
  end
end

#create_timestream_measure(key, value, type) ⇒ Object

Raises:



123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/fluent/plugin/out_timestream.rb', line 123

def create_timestream_measure(key, value, type)
  value = value.to_s

  # Timestream does not accept empty string.
  # By raising error, ignore entire record.
  raise EmptyValueError, key if value.empty?

  {
    name: key,
    value: value,
    type: type
  }
end

#create_timestream_record(dimensions, time, measures) ⇒ Object

Raises:



99
100
101
102
103
104
105
106
107
# File 'lib/fluent/plugin/out_timestream.rb', line 99

def create_timestream_record(dimensions, time, measures)
  raise NoDimensionsError if dimensions.empty?
  {
    dimensions: dimensions,
    time: time.to_s,
    time_unit: @time_unit,
    **build_measure_payload(measures)
  }
end

#create_timestream_records(chunk) ⇒ Object

rubocop:disable Metrics/MethodLength



165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/fluent/plugin/out_timestream.rb', line 165

def create_timestream_records(chunk)
  timestream_records = []
  chunk.each do |time, record|
    time = record.delete(@time_key) unless @time_key.nil?
    dimensions, measures = create_timestream_dimensions_and_measures(record)
    timestream_records.push(create_timestream_record(dimensions, time, measures))
  rescue EmptyValueError, NoDimensionsError => e
    log.warn("ignored record due to (#{e})")
    log.debug("ignored record details: #{record}")
    next
  end

  timestream_records
end

#credential_optionsObject



75
76
77
78
79
80
81
82
83
84
# File 'lib/fluent/plugin/out_timestream.rb', line 75

def credential_options
  if @aws_key_id && @aws_sec_key
    {
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key
    }
  else
    {}
  end
end

#format(_tag, time, record) ⇒ Object



95
96
97
# File 'lib/fluent/plugin/out_timestream.rb', line 95

def format(_tag, time, record)
  [time, record].to_msgpack
end

#formatted_to_msgpack_binaryObject



91
92
93
# File 'lib/fluent/plugin/out_timestream.rb', line 91

def formatted_to_msgpack_binary
  true
end

#measure_typesObject



150
151
152
153
154
155
156
157
158
# File 'lib/fluent/plugin/out_timestream.rb', line 150

def measure_types
  @measure_types ||= if @target_measure.nil?
                       {}
                     elsif multi_measure?
                       @target_measure.multi_measures.to_h { |m| [m.name, m.type] }
                     else
                       { @target_measure.name => @target_measure.type }
                     end
end

#multi_measure?Boolean

Returns:

  • (Boolean)


160
161
162
# File 'lib/fluent/plugin/out_timestream.rb', line 160

def multi_measure?
  @target_measure&.type == 'MULTI'
end

#multi_measure_payload(measures) ⇒ Object



206
207
208
209
210
211
212
# File 'lib/fluent/plugin/out_timestream.rb', line 206

def multi_measure_payload(measures)
  {
    measure_name: @target_measure.name,
    measure_value_type: 'MULTI',
    measure_values: measures
  }
end

#single_measure_payload(measures) ⇒ Object



214
215
216
217
218
219
220
221
# File 'lib/fluent/plugin/out_timestream.rb', line 214

def single_measure_payload(measures)
  measure = measures.empty? ? DUMMY_MEASURE : measures.first
  {
    measure_name: measure[:name],
    measure_value: measure[:value],
    measure_value_type: measure[:type]
  }
end

#validate_time_unitObject

Raises:

  • (Fluent::ConfigError)


86
87
88
89
# File 'lib/fluent/plugin/out_timestream.rb', line 86

def validate_time_unit
  return if VALID_TIME_UNIT.include?(@time_unit)
  raise Fluent::ConfigError, "Invalid time_unit: #{@time_unit}"
end

#write(chunk) ⇒ Object

rubocop:enable Metrics/MethodLength



181
182
183
184
185
# File 'lib/fluent/plugin/out_timestream.rb', line 181

def write(chunk)
  records = create_timestream_records(chunk)
  log.info("read #{records.length} records from chunk")
  write_records(records)
end

#write_records(records) ⇒ Object



187
188
189
190
191
192
193
194
195
196
# File 'lib/fluent/plugin/out_timestream.rb', line 187

def write_records(records)
  return if records.empty?
  @client.write_records(
    database_name: @database,
    table_name: @table,
    records: records
  )
rescue Aws::TimestreamWrite::Errors::RejectedRecordsException => e
  log.error(e.rejected_records)
end