Class: Fluent::Plugin::GCSOutput

Inherits:
Output
  • Object
show all
Defined in:
lib/fluent/plugin/out_gcs.rb

Constant Summary collapse

DEFAULT_FORMAT_TYPE =
"out_file"
MAX_HEX_RANDOM_LENGTH =
32

Instance Method Summary collapse

Constructor Details

#initializeGCSOutput

Returns a new instance of GCSOutput.



15
16
17
18
19
# File 'lib/fluent/plugin/out_gcs.rb', line 15

def initialize
  super
  require "google/cloud/storage"
  Google::Apis.logger = log
end

Instance Method Details

#configure(conf) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/fluent/plugin/out_gcs.rb', line 75

def configure(conf)
  compat_parameters_convert(conf, :buffer, :formatter, :inject)
  super

  if @hex_random_length > MAX_HEX_RANDOM_LENGTH
    raise Fluent::ConfigError, "hex_random_length parameter should be set to #{MAX_HEX_RANDOM_LENGTH} characters or less."
  end

  # The customer-supplied, AES-256 encryption key that will be used to encrypt the file.
  @encryption_opts = {
    encryption_key: @encryption_key,
  }

  if @object_metadata
    @object_metadata_hash = @object_metadata.map {|m| [m.key, m.value] }.to_h
  end

  @formatter = formatter_create

  @object_creator = Fluent::GCS.discovered_object_creator(@store_as, transcoding: @transcoding)
  # For backward compatibility
  # TODO: Remove time_slice_format when end of support compat_parameters
  @configured_time_slice_format = conf['time_slice_format']
  @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey']))

  if @credentials_json
    @credentials = @credentials_json
  else
    @credentials = keyfile
  end
end

#format(tag, time, record) ⇒ Object



120
121
122
123
# File 'lib/fluent/plugin/out_gcs.rb', line 120

def format(tag, time, record)
  r = inject_values_to_record(tag, time, record)
  @formatter.format(tag, time, r)
end

#multi_workers_ready?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/fluent/plugin/out_gcs.rb', line 125

def multi_workers_ready?
  true
end

#startObject



107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/fluent/plugin/out_gcs.rb', line 107

def start
  @gcs = Google::Cloud::Storage.new(
    project: @project,
    keyfile: @credentials,
    retries: @client_retries,
    timeout: @client_timeout
  )
  @gcs_bucket = @gcs.bucket(@bucket)

  ensure_bucket
  super
end

#write(chunk) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/fluent/plugin/out_gcs.rb', line 129

def write(chunk)
  path = generate_path(chunk)

  @object_creator.create(chunk) do |obj|
    opts = {
      metadata: @object_metadata_hash,
      acl: @acl,
      storage_class: @storage_class,
      content_type: @object_creator.content_type,
      content_encoding: @object_creator.content_encoding,
    }
    opts.merge!(@encryption_opts)

    log.debug { "out_gcs: upload chunk:#{chunk.key} to gcs://#{@bucket}/#{path} options: #{opts}" }
    @gcs_bucket.upload_file(obj.path, path, **opts)
  end
end