Class: Contrails::Parallel
- Inherits:
-
Object
- Object
- Contrails::Parallel
show all
- Includes:
- Chainable
- Defined in:
- lib/contrails/parallel.rb
Instance Method Summary
collapse
Methods included from Chainable
#*, #>>, included, #run, #to_proc
Constructor Details
#initialize(*procs) ⇒ Parallel
Returns a new instance of Parallel.
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# File 'lib/contrails/parallel.rb', line 7
def initialize(*procs)
results = []
@procs = procs
@semaphore = Semaphore.new(@procs.length, results)
ps = @procs.map.with_index do |p, i|
p = p && p.dup
p.callback {|*r| results[i] = r; @semaphore.signal}
p
end
internal_callback { |*a|
ps.each {|p| EM.next_tick(lambda { p.call(*a) }) }
}
end
|
Instance Method Details
#bind(other) ⇒ Object
28
29
30
31
|
# File 'lib/contrails/parallel.rb', line 28
def bind(other)
@semaphore.callback(&other)
self
end
|
#call(*a) ⇒ Object
37
38
39
|
# File 'lib/contrails/parallel.rb', line 37
def call(*a)
self.succeed(*a)
end
|
#callback(&b) ⇒ Object
24
25
26
|
# File 'lib/contrails/parallel.rb', line 24
def callback(&b)
@semaphore.callback(&b)
end
|
#distribute(other) ⇒ Object
33
34
35
|
# File 'lib/contrails/parallel.rb', line 33
def distribute(other)
Contrails::Parallel.new(*(@procs + [other]))
end
|