Class: Threach::MultiEnum
- Inherits:
-
Object
- Object
- Threach::MultiEnum
- Includes:
- Enumerable
- Defined in:
- lib/jruby_threach.rb
Overview
A class that encapsulates several enumerables (that respond to the same enumerable with the same arity) and allows you to call them as if they were a single enumerable (using multiple threads to draw from them, if desired)
Instance Attribute Summary collapse
-
#queue ⇒ Object
The queue that acts as the common cache for objects pulled from each of the enumerables.
Instance Method Summary collapse
-
#each(&blk) ⇒ Object
Pull records out of the given enumerators using the number of threads specified at initialization.
-
#initialize(enumerators, numthreads = nil, iterator = :each, size = 5) ⇒ Threach::MultiEnum
constructor
Create a new MultiEnum.
Methods included from Enumerable
Constructor Details
#initialize(enumerators, numthreads = nil, iterator = :each, size = 5) ⇒ Threach::MultiEnum
Create a new MultiEnum
69 70 71 72 73 74 75 |
# File 'lib/jruby_threach.rb', line 69 def initialize enumerators, numthreads=nil, iterator = :each, size = 5 @enum = enumerators @iter = iterator @size = size @numthreads = (numthreads.nil? or numthreads == 0) ? enumerators.size : numthreads @queue = Threach::Queue.new(@size) end |
Instance Attribute Details
#queue ⇒ Object
The queue that acts as the common cache for objects pulled from each of the enumerables
59 60 61 |
# File 'lib/jruby_threach.rb', line 59 def queue @queue end |
Instance Method Details
#each(&blk) ⇒ Object
Pull records out of the given enumerators using the number of threads specified at initialization. Order of items is, obviously, not guaranteed.
Also obviously, the passed block need to be of the same arity as the enumerator symbol passed into the intializer.
An uncaptured exception thrown by any of the enumerators will bring the whole thing crashing down.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/jruby_threach.rb', line 87 def each &blk @producers = [] tmn = -1 @enum.each_slice(@numthreads).each do |eslice| tmn += 1 @producers << Thread.new(eslice, tmn) do |eslice, tmn| Thread.current[:threach_multi_num] = "p#{tmn}" begin eslice.size.times do |i| eslice[i].send(@iter) do |*x| # puts "...pushing #{x}" @queue.put "#{Thread.current[:threach_multi_num]}: #{x}" end end @queue.put :threach_multi_eof rescue Exception => e @queue.put :threach_multi_eof raise StopIteration.new "Error in #{eslice.inspect}: #{e.inspect}" end end end done = 0 while done < @numthreads d = @queue.take # puts "...pulling #{d}" if d == :threach_multi_eof done += 1 next end yield d end @producers.each {|p| p.join} end |