Module: Fluent::Plugin::Buffer::Chunk::ZstdDecompressable
- Includes:
- Compressable
- Defined in:
- lib/fluent/plugin/buffer/chunk.rb
Instance Method Summary collapse
- #append(data, **kwargs) ⇒ Object
- #open(**kwargs, &block) ⇒ Object
- #read(**kwargs) ⇒ Object
- #write_to(io, **kwargs) ⇒ Object
Methods included from Compressable
Instance Method Details
#append(data, **kwargs) ⇒ Object
| 251 252 253 254 255 256 257 258 259 260 261 262 263 | # File 'lib/fluent/plugin/buffer/chunk.rb', line 251 def append(data, **kwargs) if kwargs[:compress] == :zstd io = StringIO.new stream = Zstd::StreamWriter.new(io) data.each do |d| stream.write(d) end stream.finish concat(io.string, data.size) else super end end | 
#open(**kwargs, &block) ⇒ Object
| 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 | # File 'lib/fluent/plugin/buffer/chunk.rb', line 265 def open(**kwargs, &block) if kwargs[:compressed] == :zstd super else super(**kwargs) do |chunk_io| output_io = if chunk_io.is_a?(StringIO) StringIO.new else Tempfile.new('decompressed-data') end output_io.binmode if output_io.is_a?(Tempfile) decompress(input_io: chunk_io, output_io: output_io, type: :zstd) output_io.seek(0, IO::SEEK_SET) yield output_io end end end | 
#read(**kwargs) ⇒ Object
| 283 284 285 286 287 288 289 | # File 'lib/fluent/plugin/buffer/chunk.rb', line 283 def read(**kwargs) if kwargs[:compressed] == :zstd super else decompress(super,type: :zstd) end end | 
#write_to(io, **kwargs) ⇒ Object
| 291 292 293 294 295 296 297 298 299 | # File 'lib/fluent/plugin/buffer/chunk.rb', line 291 def write_to(io, **kwargs) open(compressed: :zstd) do |chunk_io| if kwargs[:compressed] == :zstd IO.copy_stream(chunk_io, io) else decompress(input_io: chunk_io, output_io: io, type: :zstd) end end end |