Class: Cabriolet::Streaming::BatchProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/cabriolet/streaming.rb

Overview

Stream processor for batch operations

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(chunk_size: StreamParser::DEFAULT_CHUNK_SIZE) ⇒ BatchProcessor

Returns a new instance of BatchProcessor.



181
182
183
184
# File 'lib/cabriolet/streaming.rb', line 181

def initialize(chunk_size: StreamParser::DEFAULT_CHUNK_SIZE)
  @chunk_size = chunk_size
  @stats = { processed: 0, failed: 0, bytes: 0 }
end

Instance Attribute Details

#statsObject (readonly)

Returns the value of attribute stats.



218
219
220
# File 'lib/cabriolet/streaming.rb', line 218

def stats
  @stats
end

Instance Method Details

#process_archive(path) {|file| ... } ⇒ Object

Process single archive in streaming mode

Parameters:

  • path (String)

    Archive path

Yields:

  • (file)

    Yields each file



203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/cabriolet/streaming.rb', line 203

def process_archive(path)
  parser = StreamParser.new(path, chunk_size: @chunk_size)

  parser.each_file do |file|
    yield file, path
    @stats[:processed] += 1
    @stats[:bytes] += file.size if file.respond_to?(:size)
  rescue StandardError => e
    @stats[:failed] += 1
    warn "Error processing #{file.name} from #{path}: #{e.message}"
  end
rescue StandardError => e
  warn "Error processing archive #{path}: #{e.message}"
end

#process_archives(paths) {|file, archive_path| ... } ⇒ Hash

Process multiple archives in streaming mode

Parameters:

  • paths (Array<String>)

    Array of archive paths

Yields:

  • (file, archive_path)

    Yields each file with its archive path

Returns:

  • (Hash)

    Processing statistics



191
192
193
194
195
196
197
# File 'lib/cabriolet/streaming.rb', line 191

def process_archives(paths, &block)
  paths.each do |path|
    process_archive(path, &block)
  end

  @stats
end