Class: Fluent::GoogleCloudStorageOutput
- Inherits:
-
TimeSlicedOutput
- Object
- TimeSlicedOutput
- Fluent::GoogleCloudStorageOutput
- Includes:
- Mixin::ConfigPlaceholders, Mixin::PlainTextFormatter
- Defined in:
- lib/fluent/plugin/out_google_cloud_storage.rb
Constant Summary collapse
- CHUNK_ID_PLACE_HOLDER =
'${chunk_id}'
Instance Method Summary collapse
- #call_google_api(params) ⇒ Object
- #chunk_unique_id_to_str(unique_id) ⇒ Object
- #configure(conf) ⇒ Object
-
#initialize ⇒ GoogleCloudStorageOutput
constructor
A new instance of GoogleCloudStorageOutput.
- #path_format(chunk_key) ⇒ Object
- #prepare_client ⇒ Object
- #send_data(path, data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Constructor Details
#initialize ⇒ GoogleCloudStorageOutput
Returns a new instance of GoogleCloudStorageOutput.
42 43 44 45 46 47 48 49 50 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 42 def initialize super require 'zlib' require 'net/http' require 'time' require 'google/api_client' require 'signet/oauth_2/client' require 'mime-types' end |
Instance Method Details
#call_google_api(params) ⇒ Object
57 58 59 60 61 62 63 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 57 def call_google_api(params) # refresh_auth if @google_api_client..expired? @google_api_client..fetch_access_token! end return @google_api_client.execute(params) end |
#chunk_unique_id_to_str(unique_id) ⇒ Object
120 121 122 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 120 def chunk_unique_id_to_str(unique_id) unique_id.unpack('C*').map{|x| x.to_s(16).rjust(2,'0')}.join('') end |
#configure(conf) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 65 def configure(conf) if conf['path'] if conf['path'].index('%S') conf['time_slice_format'] = '%Y%m%d%H%M%S' elsif conf['path'].index('%M') conf['time_slice_format'] = '%Y%m%d%H%M' elsif conf['path'].index('%H') conf['time_slice_format'] = '%Y%m%d%H' end end super @client = prepare_client() if @path.index(CHUNK_ID_PLACE_HOLDER).nil? raise Fluent::ConfigError, "path must contain ${chunk_id}, which is the placeholder for chunk_id, when append is set to false." end end |
#path_format(chunk_key) ⇒ Object
114 115 116 117 118 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 114 def path_format(chunk_key) path = Time.strptime(chunk_key, @time_slice_format).strftime(@path) log.debug "GCS Path: #{path}" path end |
#prepare_client ⇒ Object
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 85 def prepare_client @google_api_client = Google::APIClient.new( :application_name => "fluent-plugin-google-cloud-storage", :application_version => "0.3.1") begin key = Google::APIClient::KeyUtils.load_from_pkcs12( @service_pkcs12_path, @service_pkcs12_password) @google_api_client. = Signet::OAuth2::Client.new( token_credential_uri: "https://accounts.google.com/o/oauth2/token", audience: "https://accounts.google.com/o/oauth2/token", issuer: @service_email, scope: "https://www.googleapis.com/auth/devstorage.read_write", signing_key: key) @google_api_client..fetch_access_token! rescue Signet::AuthorizationError raise Fluent::ConfigError, "Error occurred authenticating with Google" end @storage_api = @google_api_client.discovered_api("storage", "v1") return @google_api_client end |
#send_data(path, data) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 124 def send_data(path, data) mimetype = MIME::Types.type_for(path).first io = nil if ["gz", "gzip"].include?(@compress) io = StringIO.new("") writer = Zlib::GzipWriter.new(io) writer.write(data) writer.finish io.rewind else io = StringIO.new(data) end media = Google::APIClient::UploadIO.new(io, mimetype.content_type, File.basename(path)) call_google_api(api_method: @storage_api.objects.insert, parameters: { uploadType: "multipart", project: @project_id, bucket: @bucket_id, name: path }, body_object: { contentType: media.content_type }, media: media) end |
#shutdown ⇒ Object
110 111 112 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 110 def shutdown super end |
#start ⇒ Object
106 107 108 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 106 def start super end |
#write(chunk) ⇒ Object
151 152 153 154 155 156 157 |
# File 'lib/fluent/plugin/out_google_cloud_storage.rb', line 151 def write(chunk) hdfs_path = path_format(chunk.key).gsub(CHUNK_ID_PLACE_HOLDER, chunk_unique_id_to_str(chunk.unique_id)) send_data(hdfs_path, chunk.read) hdfs_path end |