Module: Fluent::Plugin::Buffer::Chunk::Decompressable
- Includes:
- Compressable
- Defined in:
- lib/fluent/plugin/buffer/chunk.rb
Instance Method Summary collapse
- #append(data, **kwargs) ⇒ Object
- #open(**kwargs, &block) ⇒ Object
- #read(**kwargs) ⇒ Object
- #write_to(io, **kwargs) ⇒ Object
Methods included from Compressable
Instance Method Details
#append(data, **kwargs) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/fluent/plugin/buffer/chunk.rb', line 187 def append(data, **kwargs) if kwargs[:compress] == :gzip io = StringIO.new Zlib::GzipWriter.wrap(io) do |gz| data.each do |d| gz.write d end end concat(io.string, data.size) else super end end |
#open(**kwargs, &block) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/fluent/plugin/buffer/chunk.rb', line 201 def open(**kwargs, &block) if kwargs[:compressed] == :gzip super else super(**kwargs) do |chunk_io| output_io = if chunk_io.is_a?(StringIO) StringIO.new else Tempfile.new('decompressed-data') end output_io.binmode if output_io.is_a?(Tempfile) decompress(input_io: chunk_io, output_io: output_io) output_io.seek(0, IO::SEEK_SET) yield output_io end end end |
#read(**kwargs) ⇒ Object
219 220 221 222 223 224 225 |
# File 'lib/fluent/plugin/buffer/chunk.rb', line 219 def read(**kwargs) if kwargs[:compressed] == :gzip super else decompress(super) end end |
#write_to(io, **kwargs) ⇒ Object
227 228 229 230 231 232 233 234 235 |
# File 'lib/fluent/plugin/buffer/chunk.rb', line 227 def write_to(io, **kwargs) open(compressed: :gzip) do |chunk_io| if kwargs[:compressed] == :gzip IO.copy_stream(chunk_io, io) else decompress(input_io: chunk_io, output_io: io) end end end |