Class: Fluent::Plugin::TimestreamOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::TimestreamOutput
show all
- Defined in:
- lib/fluent/plugin/out_timestream.rb
Overview
rubocop: disable Metrics/ClassLength Fluent plugin for Amazon Timestream
Defined Under Namespace
Classes: EmptyValueError, NoDimensionsError
Constant Summary
collapse
- VALID_TIME_UNIT =
%w[
SECONDS
MILLISECONDS
MICROSECONDS
NANOSECONDS
].freeze
- DUMMY_MEASURE =
{ name: '-', value: '-', type: 'VARCHAR' }.freeze
Instance Method Summary
collapse
Instance Method Details
#build_measure_payload(measures) ⇒ Object
198
199
200
201
202
203
204
|
# File 'lib/fluent/plugin/out_timestream.rb', line 198
def build_measure_payload(measures)
if multi_measure? && !measures.empty?
multi_measure_payload(measures)
else
single_measure_payload(measures)
end
end
|
62
63
64
65
66
67
68
69
70
71
72
73
|
# File 'lib/fluent/plugin/out_timestream.rb', line 62
def configure(conf)
super
options = credential_options
options[:region] = @region if @region
options[:endpoint] = @endpoint if @endpoint
options[:ssl_verify_peer] = @ssl_verify_peer
@client = Aws::TimestreamWrite::Client.new(options)
@database = ENV['AWS_TIMESTREAM_DATABASE'] if @database.nil?
@table = ENV['AWS_TIMESTREAM_TABLE'] if @table.nil?
validate_time_unit
end
|
#create_timestream_dimension(key, value) ⇒ Object
109
110
111
112
113
114
115
116
117
118
119
120
121
|
# File 'lib/fluent/plugin/out_timestream.rb', line 109
def create_timestream_dimension(key, value)
value = value.to_s
return nil if value.empty?
{
dimension_value_type: 'VARCHAR',
name: key,
value: value
}
end
|
#create_timestream_dimensions_and_measures(record) ⇒ Object
137
138
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/fluent/plugin/out_timestream.rb', line 137
def create_timestream_dimensions_and_measures(record)
record.each_with_object([[], []]) do |(key, value), (dimensions, measures)|
measure_type = measure_types[key]
if measure_type
measure = create_timestream_measure(key, value, measure_type)
measures << measure if measure
else
dimension = create_timestream_dimension(key, value)
dimensions << dimension if dimension
end
end
end
|
#create_timestream_measure(key, value, type) ⇒ Object
123
124
125
126
127
128
129
130
131
132
133
134
135
|
# File 'lib/fluent/plugin/out_timestream.rb', line 123
def create_timestream_measure(key, value, type)
value = value.to_s
raise EmptyValueError, key if value.empty?
{
name: key,
value: value,
type: type
}
end
|
#create_timestream_record(dimensions, time, measures) ⇒ Object
99
100
101
102
103
104
105
106
107
|
# File 'lib/fluent/plugin/out_timestream.rb', line 99
def create_timestream_record(dimensions, time, measures)
raise NoDimensionsError if dimensions.empty?
{
dimensions: dimensions,
time: time.to_s,
time_unit: @time_unit,
**build_measure_payload(measures)
}
end
|
#create_timestream_records(chunk) ⇒ Object
rubocop:disable Metrics/MethodLength
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'lib/fluent/plugin/out_timestream.rb', line 165
def create_timestream_records(chunk)
timestream_records = []
chunk.each do |time, record|
time = record.delete(@time_key) unless @time_key.nil?
dimensions, measures = create_timestream_dimensions_and_measures(record)
timestream_records.push(create_timestream_record(dimensions, time, measures))
rescue EmptyValueError, NoDimensionsError => e
log.warn("ignored record due to (#{e})")
log.debug("ignored record details: #{record}")
next
end
timestream_records
end
|
#credential_options ⇒ Object
75
76
77
78
79
80
81
82
83
84
|
# File 'lib/fluent/plugin/out_timestream.rb', line 75
def credential_options
if @aws_key_id && @aws_sec_key
{
access_key_id: @aws_key_id,
secret_access_key: @aws_sec_key
}
else
{}
end
end
|
95
96
97
|
# File 'lib/fluent/plugin/out_timestream.rb', line 95
def format(_tag, time, record)
[time, record].to_msgpack
end
|
91
92
93
|
# File 'lib/fluent/plugin/out_timestream.rb', line 91
def formatted_to_msgpack_binary
true
end
|
#measure_types ⇒ Object
150
151
152
153
154
155
156
157
158
|
# File 'lib/fluent/plugin/out_timestream.rb', line 150
def measure_types
@measure_types ||= if @target_measure.nil?
{}
elsif multi_measure?
@target_measure.multi_measures.to_h { |m| [m.name, m.type] }
else
{ @target_measure.name => @target_measure.type }
end
end
|
#multi_measure? ⇒ Boolean
160
161
162
|
# File 'lib/fluent/plugin/out_timestream.rb', line 160
def multi_measure?
@target_measure&.type == 'MULTI'
end
|
#multi_measure_payload(measures) ⇒ Object
206
207
208
209
210
211
212
|
# File 'lib/fluent/plugin/out_timestream.rb', line 206
def multi_measure_payload(measures)
{
measure_name: @target_measure.name,
measure_value_type: 'MULTI',
measure_values: measures
}
end
|
#single_measure_payload(measures) ⇒ Object
214
215
216
217
218
219
220
221
|
# File 'lib/fluent/plugin/out_timestream.rb', line 214
def single_measure_payload(measures)
measure = measures.empty? ? DUMMY_MEASURE : measures.first
{
measure_name: measure[:name],
measure_value: measure[:value],
measure_value_type: measure[:type]
}
end
|
#validate_time_unit ⇒ Object
86
87
88
89
|
# File 'lib/fluent/plugin/out_timestream.rb', line 86
def validate_time_unit
return if VALID_TIME_UNIT.include?(@time_unit)
raise Fluent::ConfigError, "Invalid time_unit: #{@time_unit}"
end
|
#write(chunk) ⇒ Object
rubocop:enable Metrics/MethodLength
181
182
183
184
185
|
# File 'lib/fluent/plugin/out_timestream.rb', line 181
def write(chunk)
records = create_timestream_records(chunk)
log.info("read #{records.length} records from chunk")
write_records(records)
end
|
#write_records(records) ⇒ Object
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/fluent/plugin/out_timestream.rb', line 187
def write_records(records)
return if records.empty?
@client.write_records(
database_name: @database,
table_name: @table,
records: records
)
rescue Aws::TimestreamWrite::Errors::RejectedRecordsException => e
log.error(e.rejected_records)
end
|