Class: Threach::MultiEnum

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/jruby_threach.rb

Overview

A class that encapsulates several enumerables (that respond to the same enumerable with the same arity) and allows you to call them as if they were a single enumerable (using multiple threads to draw from them, if desired)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Enumerable

#mthreach, #threach

Constructor Details

#initialize(enumerators, numthreads = nil, iterator = :each, size = 5) ⇒ Threach::MultiEnum

Create a new MultiEnum

Parameters:

  • enumerators (Enumerable)

    A list of enumerators that you wish to act as a single enum

  • numthreads (Integer, nil) (defaults to: nil)

    The number of threads to dedicate to pulling items

  • iterator (Symbol) (defaults to: :each)

    Which iterator to call against each enum

  • size (Integer) (defaults to: 5)

    The size of the underlying queue off the enumerators and pushing them onto the shared queue. nil or zero implies one for each enumerator



69
70
71
72
73
74
75
# File 'lib/jruby_threach.rb', line 69

def initialize enumerators, numthreads=nil, iterator = :each, size = 5
  @enum = enumerators
  @iter = iterator
  @size = size
  @numthreads = (numthreads.nil? or numthreads == 0) ? enumerators.size : numthreads
  @queue = Threach::Queue.new(@size)
end

Instance Attribute Details

#queueObject

The queue that acts as the common cache for objects pulled from each of the enumerables



59
60
61
# File 'lib/jruby_threach.rb', line 59

def queue
  @queue
end

Instance Method Details

#each(&blk) ⇒ Object

Pull records out of the given enumerators using the number of threads specified at initialization. Order of items is, obviously, not guaranteed.

Also obviously, the passed block need to be of the same arity as the enumerator symbol passed into the intializer.

An uncaptured exception thrown by any of the enumerators will bring the whole thing crashing down.



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/jruby_threach.rb', line 87

def each &blk
  @producers = []
  tmn = -1
  @enum.each_slice(@numthreads).each do |eslice|
    tmn += 1
    @producers << Thread.new(eslice, tmn) do |eslice, tmn|
      Thread.current[:threach_multi_num] = "p#{tmn}"
      begin
        eslice.size.times do |i|
          eslice[i].send(@iter) do |*x|
            # puts "...pushing #{x}"
            @queue.put "#{Thread.current[:threach_multi_num]}: #{x}"
          end
        end
        @queue.put :threach_multi_eof
      rescue Exception => e
        @queue.put :threach_multi_eof
        raise StopIteration.new "Error in #{eslice.inspect}: #{e.inspect}"
      end
    end
  end

  done = 0
  
  while done < @numthreads
    d = @queue.take
    # puts "...pulling #{d}"
    if d == :threach_multi_eof
      done += 1 
      next
    end
    yield d
  end
  
  @producers.each {|p| p.join}
end