Class: Tapioca::Executor
- Inherits:
-
Object
- Object
- Tapioca::Executor
- Extended by:
- T::Sig
- Defined in:
- lib/tapioca/executor.rb
Constant Summary collapse
- MINIMUM_ITEMS_PER_WORKER =
T.let(2, Integer)
Instance Method Summary collapse
-
#initialize(queue, number_of_workers: nil) ⇒ Executor
constructor
A new instance of Executor.
- #run_in_parallel(&block) ⇒ Object
Constructor Details
#initialize(queue, number_of_workers: nil) ⇒ Executor
Returns a new instance of Executor.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/tapioca/executor.rb', line 13 def initialize(queue, number_of_workers: nil) @queue = queue # Forking workers is expensive and not worth it for a low number of gems. Here we assign the number of workers to # be the minimum between the number of available processors (max) or the number of workers to make sure that each # one has at least 4 items to process @number_of_workers = T.let( number_of_workers || [Etc.nprocessors, (queue.length.to_f / MINIMUM_ITEMS_PER_WORKER).ceil].min, Integer ) # The number of items that will be processed per worker, so that we can split the queue into groups and assign # them to each one of the workers @items_per_worker = T.let((queue.length.to_f / @number_of_workers).ceil, Integer) end |
Instance Method Details
#run_in_parallel(&block) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/tapioca/executor.rb', line 34 def run_in_parallel(&block) # If we only have one worker selected, it's not worth forking, just run sequentially return @queue.map { |item| block.call(item) } if @number_of_workers == 1 read_pipes = [] write_pipes = [] # If we have more than one worker, fork the pool by shifting the expected number of items per worker from the # queue workers = (0...@number_of_workers).map do items = @queue.shift(@items_per_worker) # Each worker has their own pair of pipes, so that we can read the result from each worker separately read, write = IO.pipe read_pipes << read write_pipes << write fork do read.close result = items.map { |item| block.call(item) } # Pack the result as a Base64 string of the Marshal dump of the array of values returned by the block that we # ran in parallel packed = [Marshal.dump(result)].pack("m") write.puts(packed) write.close end end # Close all the write pipes, then read and close from all the read pipes write_pipes.each(&:close) result = read_pipes.map do |pipe| content = pipe.read pipe.close content end # Wait until all the workers finish. Notice that waiting for the PIDs can only happen after we read and close the # pipe or else we may end up in a condition where writing to the pipe hangs indefinitely workers.each { |pid| Process.waitpid(pid) } # Decode the value back into the Ruby objects by doing the inverse of what each worker does result.flat_map { |item| T.unsafe(Marshal.load(item.unpack1("m"))) } end |