Class: ExceptionalSynchrony::ParallelSync
- Inherits:
-
Object
- Object
- ExceptionalSynchrony::ParallelSync
- Defined in:
- lib/exceptional_synchrony/parallel_sync.rb
Class Method Summary collapse
Instance Method Summary collapse
- #add(proc = nil, &block) ⇒ Object
-
#initialize(em, downstream = nil) ⇒ ParallelSync
constructor
em is the EventMachine proxy Downstream is an optional queue.
-
#run_all! ⇒ Object
Runs all the jobs that have been added.
Constructor Details
#initialize(em, downstream = nil) ⇒ ParallelSync
em is the EventMachine proxy Downstream is an optional queue. If provided it will run the job for us. It must create the Fiber as well.
15 16 17 18 19 20 |
# File 'lib/exceptional_synchrony/parallel_sync.rb', line 15 def initialize(em, downstream = nil) @em = em @downstream = downstream @jobs = [] @finished = Set.new end |
Class Method Details
.parallel(em, *args) {|parallel_sync| ... } ⇒ Object
7 8 9 10 11 |
# File 'lib/exceptional_synchrony/parallel_sync.rb', line 7 def self.parallel(em, *args) parallel_sync = new(em, *args) yield parallel_sync parallel_sync.run_all! end |
Instance Method Details
#add(proc = nil, &block) ⇒ Object
22 23 24 25 |
# File 'lib/exceptional_synchrony/parallel_sync.rb', line 22 def add(proc = nil, &block) job = proc || block @jobs << job end |
#run_all! ⇒ Object
Runs all the jobs that have been added. Returns the hash of responses where the key is their ordinal number, in order.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/exceptional_synchrony/parallel_sync.rb', line 29 def run_all! original_fiber = Fiber.current @responses = (0...@jobs.size).build_hash { |key| [key, nil] } # initialize in sorted order so we don't have to sort later @jobs.each_with_index do |job, index| run_and_finish = lambda do |*args| @responses[index] = CallbackExceptions.return_exception(*args, &job) @finished.add(index) check_progress(original_fiber) end if @downstream if job.respond_to?(:encapsulate) cancel_proc = -> do @responses[index] = :cancelled @finished.add(index) end @downstream.add(job.encapsulate(cancel: cancel_proc, &run_and_finish)) else @downstream.add(&run_and_finish) end else Fiber.new(&run_and_finish).resume end end unless finished? @yielded = true Fiber.yield end raise_any_exceptions(@responses) @responses end |