Class: Cabriolet::Parallel::Extractor

Inherits:
Object
  • Object
show all
Defined in:
lib/cabriolet/parallel.rb

Overview

Parallel extractor for archives

Constant Summary collapse

DEFAULT_WORKERS =
4

Instance Method Summary collapse

Constructor Details

#initialize(archive, output_dir, workers: DEFAULT_WORKERS, **options) ⇒ Extractor

Returns a new instance of Extractor.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/cabriolet/parallel.rb', line 10

def initialize(archive, output_dir, workers: DEFAULT_WORKERS, **options)
  @archive = archive
  @output_dir = output_dir
  @workers = [workers, 1].max # At least 1 worker
  @options = options
  @preserve_paths = options.fetch(:preserve_paths, true)
  @overwrite = options.fetch(:overwrite, false)
  @queue = Queue.new
  @stats = { extracted: 0, skipped: 0, failed: 0, bytes: 0 }
  @stats_mutex = Mutex.new
end

Instance Method Details

#extract_allHash

Extract all files using parallel workers

Examples:

extractor = Cabriolet::Parallel::Extractor.new(cab, 'output/', workers: 8)
stats = extractor.extract_all

Returns:

  • (Hash)

    Extraction statistics



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/cabriolet/parallel.rb', line 29

def extract_all
  FileUtils.mkdir_p(@output_dir)

  # Queue all files
  @archive.files.each { |file| @queue << file }

  # Add termination signals
  @workers.times { @queue << :done }

  # Start worker threads
  threads = Array.new(@workers) do |worker_id|
    Thread.new { worker_loop(worker_id) }
  end

  # Wait for all workers to complete
  threads.each(&:join)

  @stats
end

#extract_with_progress {|current, total, file| ... } ⇒ Hash

Extract files with progress callback

Examples:

extractor.extract_with_progress do |current, total, file|
  puts "#{current}/#{total}: #{file.name}"
end

Yields:

  • (current, total, file)

    Progress callback

Returns:

  • (Hash)

    Extraction statistics



58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/cabriolet/parallel.rb', line 58

def extract_with_progress(&block)
  return extract_all unless block

  total = @archive.files.count
  current = 0
  current_mutex = Mutex.new

  FileUtils.mkdir_p(@output_dir)

  # Queue all files
  @archive.files.each { |file| @queue << file }
  @workers.times { @queue << :done }

  # Start worker threads with progress
  threads = Array.new(@workers) do |_worker_id|
    Thread.new do
      loop do
        file = @queue.pop
        break if file == :done

        extract_file(file)

        current_mutex.synchronize do
          current += 1
          yield(current, total, file)
        end
      end
    end
  end

  threads.each(&:join)
  @stats
end