Class: ExceptionalSynchrony::ParallelSync

Inherits:
Object
  • Object
show all
Defined in:
lib/exceptional_synchrony/parallel_sync.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(em, downstream = nil) ⇒ ParallelSync

em is the EventMachine proxy Downstream is an optional queue. If provided it will run the job for us. It must create the Fiber as well.



15
16
17
18
19
20
# File 'lib/exceptional_synchrony/parallel_sync.rb', line 15

def initialize(em, downstream = nil)
  @em = em
  @downstream = downstream
  @jobs = []
  @finished = Set.new
end

Class Method Details

.parallel(em, *args) {|parallel_sync| ... } ⇒ Object

Yields:

  • (parallel_sync)


7
8
9
10
11
# File 'lib/exceptional_synchrony/parallel_sync.rb', line 7

def self.parallel(em, *args)
  parallel_sync = new(em, *args)
  yield parallel_sync
  parallel_sync.run_all!
end

Instance Method Details

#add(proc = nil, &block) ⇒ Object



22
23
24
25
# File 'lib/exceptional_synchrony/parallel_sync.rb', line 22

def add(proc = nil, &block)
  job = proc || block
  @jobs << job
end

#run_all!Object

Runs all the jobs that have been added. Returns the hash of responses where the key is their ordinal number, in order.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/exceptional_synchrony/parallel_sync.rb', line 29

def run_all!
  original_fiber = Fiber.current

  @responses = (0...@jobs.size).build_hash { |key| [key, nil] } # initialize in sorted order so we don't have to sort later

  @jobs.each_with_index do |job, index|
    run_and_finish = lambda do |*args|
      @responses[index] = CallbackExceptions.return_exception(*args, &job)
      @finished.add(index)
      check_progress(original_fiber)
    end

    if @downstream
      if job.respond_to?(:encapsulate)
        cancel_proc = -> do
          @responses[index] = :cancelled
          @finished.add(index)
        end
        @downstream.add(job.encapsulate(cancel: cancel_proc, &run_and_finish))
      else
        @downstream.add(&run_and_finish)
      end
    else
      Fiber.new(&run_and_finish).resume
    end
  end

  unless finished?
    @yielded = true
    Fiber.yield
  end

  raise_any_exceptions(@responses)

  @responses
end