Module: Enumerable

Defined in:
lib/dispatch/enumerable.rb

Instance Method Summary collapse

Instance Method Details

#p_each(stride = 1, priority = nil, &block) ⇒ Object

Parallel each



43
44
45
46
# File 'lib/dispatch/enumerable.rb', line 43

def p_each(stride=1, priority=nil,  &block)
  ary = self.to_a
  ary.count.p_times(stride, priority) { |i| block.call(ary[i]) }
end

#p_each_with_index(stride = 1, priority = nil, &block) ⇒ Object

Parallel each



49
50
51
52
# File 'lib/dispatch/enumerable.rb', line 49

def p_each_with_index(stride=1, priority=nil,  &block)
  ary = self.to_a
  ary.count.p_times(stride, priority) { |i| block.call(ary[i], i) }
end

#p_find(stride = 1, priority = nil, &block) ⇒ Object

Parallel detect; will return -one- match for &block but it may not be the ‘first’ Only useful if the test block is very expensive to run Note: each object can only run one p_find at a time



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/dispatch/enumerable.rb', line 96

def p_find(stride=1, priority=nil,  &block)
  @find_q ||= Dispatch::Queue.for(self)
  @find_q.sync do 
    @find_result = nil
    q = Dispatch::Queue.for(@find_result)
    self.p_each(stride, priority) do |obj|
      q.async { @find_result = obj } if @find_result.nil? and block.call(obj)
    end
    q.sync {}
    return @find_result
  end
end

#p_find_all(stride = 1, priority = nil, &block) ⇒ Object

Parallel select; will return array of objects for which &block returns true.



86
87
88
89
90
# File 'lib/dispatch/enumerable.rb', line 86

def p_find_all(stride=1, priority=nil,  &block)
  found_all = Dispatch::Proxy.new([])
  self.p_each(stride, priority) { |obj| found_all << obj if block.call(obj) }
  found_all.__value__
end

#p_map(stride = 1, priority = nil, &block) ⇒ Object

Parallel collect Results match the order of the original array



56
57
58
59
60
61
62
63
# File 'lib/dispatch/enumerable.rb', line 56

def p_map(stride=1, priority=nil,  &block)
  @p_map_result = Dispatch::Proxy.new([])
  @p_map_result_q ||= Dispatch::Queue.for(@p_map_result)
  @p_map_result_q.sync do
    self.p_each_with_index(stride, priority) { |obj, i| @p_map_result[i] = block.call(obj) }
  end
  @p_map_result.__value__
end

#p_mapreduce(initial, op = :+, stride = 1, priority = nil, &block) ⇒ Object

Parallel collect plus inject Accumulates from initial via op (default = ‘+’) Note: each object can only run one p_mapreduce at a time

Raises:

  • (ArgumentError)


68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/dispatch/enumerable.rb', line 68

def p_mapreduce(initial, op=:+, stride=1, priority=nil, &block)
  # Check first, since exceptions from a Dispatch block can act funky 
  raise ArgumentError if not initial.respond_to? op
  @mapreduce_q ||= Dispatch::Queue.for(self)
  @mapreduce_q.sync do # in case called more than once at a time
    @mapreduce_result = initial
    q = Dispatch::Queue.for(@mapreduce_result)
    self.p_each(stride, priority) do |obj|
      val = block.call(obj)
      q.async { @mapreduce_result = @mapreduce_result.send(op, val) }
    end
    q.sync {}
    return @mapreduce_result
  end
end