Module: Fluent::Plugin::Buffer::Chunk::ZstdDecompressable

Includes:
Compressable
Defined in:
lib/fluent/plugin/buffer/chunk.rb

Instance Method Summary collapse

Methods included from Compressable

#compress, #decompress

Instance Method Details

#append(data, **kwargs) ⇒ Object



251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/fluent/plugin/buffer/chunk.rb', line 251

def append(data, **kwargs)
  if kwargs[:compress] == :zstd
    io = StringIO.new
    stream = Zstd::StreamWriter.new(io)
    data.each do |d|
      stream.write(d)
    end
    stream.finish
    concat(io.string, data.size)
  else
    super
  end
end

#open(**kwargs, &block) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/fluent/plugin/buffer/chunk.rb', line 265

def open(**kwargs, &block)
  if kwargs[:compressed] == :zstd
    super
  else
    super(**kwargs) do |chunk_io|
      output_io = if chunk_io.is_a?(StringIO)
                    StringIO.new
                  else
                    Tempfile.new('decompressed-data')
                  end
      output_io.binmode if output_io.is_a?(Tempfile)
      decompress(input_io: chunk_io, output_io: output_io, type: :zstd)
      output_io.seek(0, IO::SEEK_SET)
      yield output_io
    end
  end
end

#read(**kwargs) ⇒ Object



283
284
285
286
287
288
289
# File 'lib/fluent/plugin/buffer/chunk.rb', line 283

def read(**kwargs)
  if kwargs[:compressed] == :zstd
    super
  else
    decompress(super,type: :zstd)
  end
end

#write_to(io, **kwargs) ⇒ Object



291
292
293
294
295
296
297
298
299
# File 'lib/fluent/plugin/buffer/chunk.rb', line 291

def write_to(io, **kwargs)
  open(compressed: :zstd) do |chunk_io|
    if kwargs[:compressed] == :zstd
      IO.copy_stream(chunk_io, io)
    else
      decompress(input_io: chunk_io, output_io: io, type: :zstd)
    end
  end
end