Class: Tapioca::Executor

Inherits:
Object
  • Object
show all
Extended by:
T::Sig
Defined in:
lib/tapioca/executor.rb

Constant Summary collapse

MINIMUM_ITEMS_PER_WORKER =
T.let(2, Integer)

Instance Method Summary collapse

Constructor Details

#initialize(queue, number_of_workers: nil) ⇒ Executor

Returns a new instance of Executor.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/tapioca/executor.rb', line 13

def initialize(queue, number_of_workers: nil)
  @queue = queue

  # Forking workers is expensive and not worth it for a low number of gems. Here we assign the number of workers to
  # be the minimum between the number of available processors (max) or the number of workers to make sure that each
  # one has at least 4 items to process
  @number_of_workers = T.let(
    number_of_workers || [Etc.nprocessors, (queue.length.to_f / MINIMUM_ITEMS_PER_WORKER).ceil].min,
    Integer
  )

  # The number of items that will be processed per worker, so that we can split the queue into groups and assign
  # them to each one of the workers
  @items_per_worker = T.let((queue.length.to_f / @number_of_workers).ceil, Integer)
end

Instance Method Details

#run_in_parallel(&block) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/tapioca/executor.rb', line 34

def run_in_parallel(&block)
  # If we only have one worker selected, it's not worth forking, just run sequentially
  return @queue.map { |item| block.call(item) } if @number_of_workers == 1

  read_pipes = []
  write_pipes = []

  # If we have more than one worker, fork the pool by shifting the expected number of items per worker from the
  # queue
  workers = (0...@number_of_workers).map do
    items = @queue.shift(@items_per_worker)

    # Each worker has their own pair of pipes, so that we can read the result from each worker separately
    read, write = IO.pipe
    read_pipes << read
    write_pipes << write

    fork do
      read.close
      result = items.map { |item| block.call(item) }

      # Pack the result as a Base64 string of the Marshal dump of the array of values returned by the block that we
      # ran in parallel
      packed = [Marshal.dump(result)].pack("m")
      write.puts(packed)
      write.close
    end
  end

  # Close all the write pipes, then read and close from all the read pipes
  write_pipes.each(&:close)
  result = read_pipes.map do |pipe|
    content = pipe.read
    pipe.close
    content
  end

  # Wait until all the workers finish. Notice that waiting for the PIDs can only happen after we read and close the
  # pipe or else we may end up in a condition where writing to the pipe hangs indefinitely
  workers.each { |pid| Process.waitpid(pid) }

  # Decode the value back into the Ruby objects by doing the inverse of what each worker does
  result.flat_map { |item| T.unsafe(Marshal.load(item.unpack1("m"))) }
end