Class: Fluent::Plugin::AliyunossOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::AliyunossOutput
- Defined in:
- lib/fluent/plugin/out_aliyunoss.rb
Constant Summary collapse
- DEFAULT_FORMAT_TYPE =
"out_file"
- DEFAULT_TIMEKEY =
60 * 60 * 24
Instance Method Summary collapse
- #compress(chunk, tmp) ⇒ Object
- #configure(conf) ⇒ Object
- #format(tag, time, record) ⇒ Object
- #multi_workers_ready? ⇒ Boolean
- #process_object_key_format(chunk, key_format) ⇒ Object
- #start ⇒ Object
- #write(chunk) ⇒ Object
Instance Method Details
#compress(chunk, tmp) ⇒ Object
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/fluent/plugin/out_aliyunoss.rb', line 64 def compress(chunk, tmp) if @store_as == "orc" # We just need a tmp file path, orc-tools convert won't work if file exists output_path = tmp.path tmp.delete # Create a symlink with .json suffix, to fool orc-tools chunk_path = File::realpath(chunk.path) fake_path = "#{chunk_path}.json" if File::exists?(fake_path) File::unlink(fake_path) end File::symlink(chunk_path, fake_path) command = "java -Dlog4j.configuration=file:/log4j.properties -jar /orc-tools.jar convert -o #{output_path} #{fake_path}" res = system command unless res raise "failed to execute java -jar /orc-tools.jar command. status = #{$?}" end File::unlink(fake_path) else res = system "gzip -c #{chunk.path} > #{tmp.path}" unless res log.warn "failed to execute gzip command. Fallback to GzipWriter. status = #{$?}" begin tmp.truncate(0) gw = Zlib::GzipWriter.new(tmp) chunk.write_to(gw) gw.close ensure gw.close rescue nil end end end end |
#configure(conf) ⇒ Object
57 58 59 60 61 62 |
# File 'lib/fluent/plugin/out_aliyunoss.rb', line 57 def configure(conf) super compat_parameters_convert(conf, :formatter, :buffer, :inject, default_chunk_key: "time") @json_formatter = formatter_create(usage: 'formatter_in_example_json', type: 'json') end |
#format(tag, time, record) ⇒ Object
131 132 133 134 |
# File 'lib/fluent/plugin/out_aliyunoss.rb', line 131 def format(tag, time, record) r = inject_values_to_record(tag, time, record) @json_formatter.format(tag, time, r) end |
#multi_workers_ready? ⇒ Boolean
115 116 117 |
# File 'lib/fluent/plugin/out_aliyunoss.rb', line 115 def multi_workers_ready? true end |
#process_object_key_format(chunk, key_format) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/fluent/plugin/out_aliyunoss.rb', line 99 def process_object_key_format(chunk, key_format) key_map = { host: Socket.gethostname, uuid: SecureRandom.hex(4), } result = key_format key_map.each do |k, v| result = result.gsub("%{#{k.to_s}}", v) end # support replace tag_parts chunk..tag.split('.').each_with_index do |t, idx| result = result.gsub("${tag_parts[#{idx}]}", t) end extract_placeholders(result, chunk.) end |
#start ⇒ Object
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/fluent/plugin/out_aliyunoss.rb', line 119 def start super @client = Aliyun::OSS::Client.new( :endpoint => @oss_endpoint, :access_key_id => @oss_key_id, :access_key_secret => @oss_key_secret) raise "Specific bucket not exists: #{@oss_bucket}" unless @client.bucket_exists? @oss_bucket @bucket = @client.get_bucket(@oss_bucket) end |
#write(chunk) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/fluent/plugin/out_aliyunoss.rb', line 136 def write(chunk) begin f = Tempfile.new('oss-') output_path = f.path compress(chunk, f) path = process_object_key_format(chunk, "#{@oss_path}.#{@store_as}") raise "Upload #{output_path} failed" unless @bucket.resumable_upload(path, output_path) ensure f.close(true) end end |