Class: Fluent::GoogleCloudStorageOutput

Inherits:
TimeSlicedOutput
  • Object
show all
Includes:
Mixin::ConfigPlaceholders, Mixin::PlainTextFormatter
Defined in:
lib/fluent/plugin/out_google_cloud_storage.rb

Constant Summary collapse

CHUNK_ID_PLACE_HOLDER =
'${chunk_id}'

Instance Method Summary collapse

Constructor Details

#initializeGoogleCloudStorageOutput

Returns a new instance of GoogleCloudStorageOutput.



42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 42

def initialize
  super
  require 'zlib'
  require 'net/http'
  require 'time'
  require 'google/api_client'
  require 'signet/oauth_2/client'
  require 'mime-types'
end

Instance Method Details

#call_google_api(params) ⇒ Object



57
58
59
60
61
62
63
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 57

def call_google_api(params)
  # refresh_auth
  if @google_api_client.authorization.expired?
    @google_api_client.authorization.fetch_access_token!
  end
  return @google_api_client.execute(params)
end

#chunk_unique_id_to_str(unique_id) ⇒ Object



120
121
122
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 120

def chunk_unique_id_to_str(unique_id)
  unique_id.unpack('C*').map{|x| x.to_s(16).rjust(2,'0')}.join('')
end

#configure(conf) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 65

def configure(conf)
  if conf['path']
    if conf['path'].index('%S')
      conf['time_slice_format'] = '%Y%m%d%H%M%S'
    elsif conf['path'].index('%M')
      conf['time_slice_format'] = '%Y%m%d%H%M'
    elsif conf['path'].index('%H')
      conf['time_slice_format'] = '%Y%m%d%H'
    end
  end

  super

  @client = prepare_client()

  if @path.index(CHUNK_ID_PLACE_HOLDER).nil?
    raise Fluent::ConfigError, "path must contain ${chunk_id}, which is the placeholder for chunk_id, when append is set to false."
  end
end

#path_format(chunk_key) ⇒ Object



114
115
116
117
118
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 114

def path_format(chunk_key)
  path = Time.strptime(chunk_key, @time_slice_format).strftime(@path)
  log.debug "GCS Path: #{path}"
  path
end

#prepare_clientObject



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 85

def prepare_client
  @google_api_client = Google::APIClient.new(
      :application_name => "fluent-plugin-google-cloud-storage",
      :application_version => "0.3.1")
  begin
    key = Google::APIClient::KeyUtils.load_from_pkcs12(
      @service_pkcs12_path, @service_pkcs12_password)
    @google_api_client.authorization = Signet::OAuth2::Client.new(
        token_credential_uri: "https://accounts.google.com/o/oauth2/token",
        audience: "https://accounts.google.com/o/oauth2/token",
        issuer: @service_email,
        scope: "https://www.googleapis.com/auth/devstorage.read_write",
        signing_key: key)
    @google_api_client.authorization.fetch_access_token!
  rescue Signet::AuthorizationError
    raise Fluent::ConfigError, "Error occurred authenticating with Google"
  end
  @storage_api = @google_api_client.discovered_api("storage", "v1")
  return @google_api_client
end

#send_data(path, data) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 124

def send_data(path, data)
  mimetype = MIME::Types.type_for(path).first

  io = nil
  if ["gz", "gzip"].include?(@compress)
    io = StringIO.new("")
    writer = Zlib::GzipWriter.new(io)
    writer.write(data)
    writer.finish
    io.rewind
  else
    io = StringIO.new(data)
  end

  media = Google::APIClient::UploadIO.new(io, mimetype.content_type, File.basename(path))

  call_google_api(api_method: @storage_api.objects.insert,
                  parameters: {
                    uploadType: "multipart",
                    project: @project_id,
                    bucket: @bucket_id,
                    name: path
                  },
                  body_object: { contentType: media.content_type },
                  media: media)
end

#shutdownObject



110
111
112
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 110

def shutdown
  super
end

#startObject



106
107
108
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 106

def start
  super
end

#write(chunk) ⇒ Object



151
152
153
154
155
156
157
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 151

def write(chunk)
  hdfs_path = path_format(chunk.key).gsub(CHUNK_ID_PLACE_HOLDER, chunk_unique_id_to_str(chunk.unique_id))

  send_data(hdfs_path, chunk.read)

  hdfs_path
end