Class: Fluent::Plugin::S3Output

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_s3.rb,
lib/fluent/plugin/s3_compressor_lzo.rb,
lib/fluent/plugin/s3_compressor_lzma2.rb,
lib/fluent/plugin/s3_compressor_parquet.rb,
lib/fluent/plugin/s3_compressor_gzip_command.rb

Defined Under Namespace

Classes: Compressor, GzipCommandCompressor, GzipCompressor, JsonCompressor, LZMA2Compressor, LZOCompressor, ParquetCompressor, TextCompressor

Constant Summary collapse

DEFAULT_FORMAT_TYPE =
"out_file"
MAX_HEX_RANDOM_LENGTH =
16

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeS3Output

Returns a new instance of S3Output.



16
17
18
19
20
# File 'lib/fluent/plugin/out_s3.rb', line 16

def initialize
  super
  @compressor = nil
  @uuid_flush_enabled = false
end

Instance Attribute Details

#bucketObject (readonly)

Returns the value of attribute bucket.



178
179
180
# File 'lib/fluent/plugin/out_s3.rb', line 178

def bucket
  @bucket
end

Instance Method Details

#configure(conf) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/fluent/plugin/out_s3.rb', line 187

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)

  super

  Aws.use_bundled_cert! if @use_bundled_cert

  if reject_s3_endpoint?
    raise Fluent::ConfigError, "s3_endpoint parameter is not supported for S3, use s3_region instead. This parameter is for S3 compatible services"
  end

  begin
    buffer_type = @buffer_config[:@type]
    @compressor = COMPRESSOR_REGISTRY.lookup(@store_as).new(buffer_type: buffer_type, log: log)
  rescue => e
    log.warn "'#{@store_as}' not supported. Use 'text' instead: error = #{e.message}"
    @compressor = TextCompressor.new
  end
  @compressor.configure(conf)

  @formatter = formatter_create

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter must be less than or equal to #{MAX_HEX_RANDOM_LENGTH}"
  end

  unless @index_format =~ /^%(0\d*)?[dxX]$/
    raise Fluent::ConfigError, "index_format parameter should follow `%[flags][width]type`. `0` is the only supported flag, and is mandatory if width is specified. `d`, `x` and `X` are supported types"
  end

  if @reduced_redundancy
    log.warn "reduced_redundancy parameter is deprecated. Use storage_class parameter instead"
    @storage_class = "REDUCED_REDUNDANCY"
  end

  @s3_object_key_format = process_s3_object_key_format
  if !@check_object
    if conf.has_key?('s3_object_key_format')
      log.warn "Set 'check_object false' and s3_object_key_format is specified. Check s3_object_key_format is unique in each write. If not, existing file will be overwritten."
    else
      log.warn "Set 'check_object false' and s3_object_key_format is not specified. Use '%{path}/%{time_slice}_%{hms_slice}.%{file_extension}' for s3_object_key_format"
      @s3_object_key_format = "%{path}/%{time_slice}_%{hms_slice}.%{file_extension}"
    end
  end

  check_s3_path_safety(conf)

  # For backward compatibility
  # TODO: Remove time_slice_format when end of support compat_parameters
  @configured_time_slice_format = conf['time_slice_format']
  @values_for_s3_object_chunk = {}
  @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))
end

#format(tag, time, record) ⇒ Object



274
275
276
277
# File 'lib/fluent/plugin/out_s3.rb', line 274

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


241
242
243
# File 'lib/fluent/plugin/out_s3.rb', line 241

def multi_workers_ready?
  true
end

#reject_s3_endpoint?Boolean

Returns:

  • (Boolean)


182
183
184
185
# File 'lib/fluent/plugin/out_s3.rb', line 182

def reject_s3_endpoint?
  @s3_endpoint && !@s3_endpoint.end_with?('vpce.amazonaws.com') &&
    @s3_endpoint.end_with?('amazonaws.com') && !['fips', 'gov'].any? { |e| @s3_endpoint.include?(e) }
end

#startObject



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/fluent/plugin/out_s3.rb', line 245

def start
  options = setup_credentials
  options[:region] = @s3_region if @s3_region
  options[:endpoint] = @s3_endpoint if @s3_endpoint
  options[:use_accelerate_endpoint] = @enable_transfer_acceleration
  options[:use_dualstack_endpoint] = @enable_dual_stack
  options[:http_proxy] = @proxy_uri if @proxy_uri
  options[:force_path_style] = @force_path_style
  options[:compute_checksums] = @compute_checksums unless @compute_checksums.nil?
  options[:signature_version] = @signature_version unless @signature_version.nil?
  options[:ssl_verify_peer] = @ssl_verify_peer
  options[:ssl_ca_bundle] = @ssl_ca_bundle if @ssl_ca_bundle
  options[:ssl_ca_directory] = @ssl_ca_directory if @ssl_ca_directory
  log.on_trace do
    options[:http_wire_trace] = true
    options[:logger] = log
  end

  s3_client = Aws::S3::Client.new(options)
  @s3 = Aws::S3::Resource.new(client: s3_client)
  @bucket = @s3.bucket(@s3_bucket)

  check_apikeys if @check_apikey_on_start
  ensure_bucket if @check_bucket
  ensure_bucket_lifecycle

  super
end

#write(chunk) ⇒ Object



279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/fluent/plugin/out_s3.rb', line 279

def write(chunk)
  i = 0
   = chunk.
  previous_path = nil
  time_slice = if .timekey.nil?
                 ''.freeze
               else
                 @time_slice_with_tz.call(.timekey)
               end

  if @check_object
    begin
      @values_for_s3_object_chunk[chunk.unique_id] ||= {
        "%{hex_random}" => hex_random(chunk),
      }
      values_for_s3_object_key_pre = {
        "%{path}" => @path,
        "%{file_extension}" => @compressor.ext,
      }
      values_for_s3_object_key_post = {
        "%{time_slice}" => time_slice,
        "%{index}" => sprintf(@index_format,i),
      }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
      values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled

      s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
        values_for_s3_object_key_pre.fetch(matched_key, matched_key)
      end
      s3path = extract_placeholders(s3path, chunk)
      s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post)
      if (i > 0) && (s3path == previous_path)
        if @overwrite
          log.warn "#{s3path} already exists, but will overwrite"
          break
        else
          raise "duplicated path is generated. use %{index} in s3_object_key_format: path = #{s3path}"
        end
      end

      i += 1
      previous_path = s3path
    end while @bucket.object(s3path).exists?
  else
    if @localtime
      hms_slicer = Time.now.strftime("%H%M%S")
    else
      hms_slicer = Time.now.utc.strftime("%H%M%S")
    end

    @values_for_s3_object_chunk[chunk.unique_id] ||= {
      "%{hex_random}" => hex_random(chunk),
    }
    values_for_s3_object_key_pre = {
      "%{path}" => @path,
      "%{file_extension}" => @compressor.ext,
    }
    values_for_s3_object_key_post = {
      "%{date_slice}" => time_slice,  # For backward compatibility
      "%{time_slice}" => time_slice,
      "%{hms_slice}" => hms_slicer,
    }.merge!(@values_for_s3_object_chunk[chunk.unique_id])
    values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled

    s3path = @s3_object_key_format.gsub(%r(%{[^}]+})) do |matched_key|
      values_for_s3_object_key_pre.fetch(matched_key, matched_key)
    end
    s3path = extract_placeholders(s3path, chunk)
    s3path = s3path.gsub(%r(%{[^}]+}), values_for_s3_object_key_post)
  end

  tmp = Tempfile.new("s3-")
  tmp.binmode
  begin
    @compressor.compress(chunk, tmp)
    tmp.rewind
    log.debug "out_s3: write chunk #{dump_unique_id_hex(chunk.unique_id)} with metadata #{chunk.} to s3://#{@s3_bucket}/#{s3path}"

    put_options = {
      body: tmp,
      content_type: @compressor.content_type,
      storage_class: @storage_class,
    }
    put_options[:server_side_encryption] = @use_server_side_encryption if @use_server_side_encryption
    put_options[:ssekms_key_id] = @ssekms_key_id if @ssekms_key_id
    put_options[:sse_customer_algorithm] = @sse_customer_algorithm if @sse_customer_algorithm
    put_options[:sse_customer_key] = @sse_customer_key if @sse_customer_key
    put_options[:sse_customer_key_md5] = @sse_customer_key_md5 if @sse_customer_key_md5
    put_options[:acl] = @acl if @acl
    put_options[:grant_full_control] = @grant_full_control if @grant_full_control
    put_options[:grant_read] = @grant_read if @grant_read
    put_options[:grant_read_acp] = @grant_read_acp if @grant_read_acp
    put_options[:grant_write_acp] = @grant_write_acp if @grant_write_acp
    put_options[:tagging] = @tagging if @tagging

    if @s3_metadata
      put_options[:metadata] = {}
      @s3_metadata.each do |k, v|
        put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
      end
    end
    @bucket.object(s3path).put(put_options)

    @values_for_s3_object_chunk.delete(chunk.unique_id)

    if @warn_for_delay
      if Time.at(chunk..timekey) < Time.now - @warn_for_delay
        log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
      end
    end
  ensure
    tmp.close(true) rescue nil
  end
end