Module: Fluent::Plugin::Buffer::Chunk::Decompressable

Includes:
Compressable
Defined in:
lib/fluent/plugin/buffer/chunk.rb

Instance Method Summary collapse

Methods included from Compressable

#compress, #decompress

Instance Method Details

#append(data, **kwargs) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/fluent/plugin/buffer/chunk.rb', line 187

def append(data, **kwargs)
  if kwargs[:compress] == :gzip
    io = StringIO.new
    Zlib::GzipWriter.wrap(io) do |gz|
      data.each do |d|
        gz.write d
      end
    end
    concat(io.string, data.size)
  else
    super
  end
end

#open(**kwargs, &block) ⇒ Object



201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/fluent/plugin/buffer/chunk.rb', line 201

def open(**kwargs, &block)
  if kwargs[:compressed] == :gzip
    super
  else
    super(**kwargs) do |chunk_io|
      output_io = if chunk_io.is_a?(StringIO)
                    StringIO.new
                  else
                    Tempfile.new('decompressed-data')
                  end
      output_io.binmode if output_io.is_a?(Tempfile)
      decompress(input_io: chunk_io, output_io: output_io)
      output_io.seek(0, IO::SEEK_SET)
      yield output_io
    end
  end
end

#read(**kwargs) ⇒ Object



219
220
221
222
223
224
225
# File 'lib/fluent/plugin/buffer/chunk.rb', line 219

def read(**kwargs)
  if kwargs[:compressed] == :gzip
    super
  else
    decompress(super)
  end
end

#write_to(io, **kwargs) ⇒ Object



227
228
229
230
231
232
233
234
235
# File 'lib/fluent/plugin/buffer/chunk.rb', line 227

def write_to(io, **kwargs)
  open(compressed: :gzip) do |chunk_io|
    if kwargs[:compressed] == :gzip
      IO.copy_stream(chunk_io, io)
    else
      decompress(input_io: chunk_io, output_io: io)
    end
  end
end