Class: Contrails::Parallel

Inherits:
Object
  • Object
show all
Includes:
Chainable
Defined in:
lib/contrails/parallel.rb

Instance Method Summary collapse

Methods included from Chainable

#*, #>>, included, #run, #to_proc

Constructor Details

#initialize(*procs) ⇒ Parallel

Returns a new instance of Parallel.



7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/contrails/parallel.rb', line 7

def initialize(*procs)
  results = []
  @procs = procs
  @semaphore = Semaphore.new(@procs.length, results)
  ps = @procs.map.with_index do |p, i|
    p = p && p.dup
    p.callback {|*r| results[i] = r; @semaphore.signal}
    p
  end
  internal_callback { |*a|
    ps.each {|p| EM.next_tick(lambda { p.call(*a) }) }
  }
end

Instance Method Details

#bind(other) ⇒ Object



28
29
30
31
# File 'lib/contrails/parallel.rb', line 28

def bind(other)
  @semaphore.callback(&other)
  self
end

#call(*a) ⇒ Object



37
38
39
# File 'lib/contrails/parallel.rb', line 37

def call(*a)
  self.succeed(*a)
end

#callback(&b) ⇒ Object



24
25
26
# File 'lib/contrails/parallel.rb', line 24

def callback(&b)
  @semaphore.callback(&b)
end

#distribute(other) ⇒ Object



33
34
35
# File 'lib/contrails/parallel.rb', line 33

def distribute(other)
  Contrails::Parallel.new(*(@procs + [other]))
end