Class: Fluent::Plugin::S3Output::ParquetCompressor
- Inherits:
-
Compressor
- Object
- Compressor
- Fluent::Plugin::S3Output::ParquetCompressor
show all
- Defined in:
- lib/fluent/plugin/s3_compressor_parquet.rb
Instance Attribute Summary
Attributes inherited from Compressor
#buffer_type, #log
Instance Method Summary
collapse
Methods inherited from Compressor
#initialize
Instance Method Details
#compress(chunk, tmp) ⇒ Object
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
# File 'lib/fluent/plugin/s3_compressor_parquet.rb', line 47
def compress(chunk, tmp)
chunk_is_file = @buffer_type == "file"
path = if chunk_is_file
chunk.path
else
w = Tempfile.new("chunk-parquet-tmp")
w.binmode
chunk.write_to(w)
w.close
w.path
end
stdout, stderr, status = columnify(path, tmp.path)
unless status.success?
raise Fluent::UnrecoverableError, "failed to execute columnify command. stdout=#{stdout} stderr=#{stderr} status=#{status.inspect}"
end
ensure
unless chunk_is_file
w.close(true) rescue nil
end
end
|
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# File 'lib/fluent/plugin/s3_compressor_parquet.rb', line 23
def configure(conf)
super
check_command("columnify", "-h")
if [:lzo, :brotli, :lz4].include?(@compress.parquet_compression_codec)
raise Fluent::ConfigError, "unsupported compression codec: #{@compress.parquet_compression_codec}"
end
@parquet_compression_codec = @compress.parquet_compression_codec.to_s.upcase
if @compress.record_type == :json
@record_type = :jsonl
else
@record_type = @compress.record_type
end
end
|
#content_type ⇒ Object
43
44
45
|
# File 'lib/fluent/plugin/s3_compressor_parquet.rb', line 43
def content_type
"application/octet-stream".freeze
end
|
#ext ⇒ Object
39
40
41
|
# File 'lib/fluent/plugin/s3_compressor_parquet.rb', line 39
def ext
"parquet".freeze
end
|