Module: Enumerable

Included in:
Threach::MultiEnum
Defined in:
lib/jruby_threach.rb

Overview

Enumerable is monkey-patched to provide two new methods: #threach and #mthreach.

Instance Method Summary collapse

Instance Method Details

#mthreach(pthreads = nil, threads = 0, iterator = :each, &blk) ⇒ Object

Build up a MultiEnum from the calling object and run threach against it

Examples:

[1..10, 'a'..'z'].mthreach(2,2) {|i| process_item(i)}

Parameters:

  • pthreads (Integer, nil) (defaults to: nil)

    The number of producer threads to run within the created Threach::MultiEnum

  • threads (Integer) (defaults to: 0)

    The number of consumer threads to run in #threach

  • iterator (Symbol) (defaults to: :each)

    Which iterator to call (:each, :each_with_index, etc.)



141
142
143
144
# File 'lib/jruby_threach.rb', line 141

def mthreach(pthreads=nil, threads = 0, iterator = :each,  &blk)
  me = Threach::MultiEnum.new(self, pthreads, iterator, threads*3)
  me.send(:threach, threads, iterator, &blk)
end

#threach(threads = 0, iterator = :each, &blk) ⇒ Object

Run the passed block using the given iterator using the given number of threads. If one of the consumer threads bails for any reason (break, throw an un-rescued error), the whole thing will shut down in an orderly fashion.

Parameters:

  • threads (Integer) (defaults to: 0)

    How many threads to use. 0 means to skip the whole threading thing completely and just directly call the indicated iterator

  • iterator (Symbol) (defaults to: :each)

    Which iterator to use (:each, :each_with_index, :each_line, etc.).



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/jruby_threach.rb', line 154

def threach(threads = 0, iterator = :each, &blk)
  
  # With no extra threads, just spin up the passed iterator
  if threads == 0
    self.send(iterator, &blk)
  else
    # Get a java BlockingQueue for the producer to dump stuff into
    bq = Threach::Queue.new(threads * 2) # capacity is twice the number of threads
    
    # And another to store errors
    errorq = Threach::Queue.new(threads + 1)
    
    # A boolean to let us know if things are going wonky
    bail = false
    outofdata = false
    
    # Build up a set of consumers
    
    consumers = []
    threads.times do |i|
      consumers << Thread.new(i) do |i|
        Thread.current[:threach_num] = i
        begin
          while true
            obj = bq.pop

            # Should we be bailing?
            if bail
              print "Thread #{Thread.current[:threach_num]}: BAIL!\n" if Threach::DEBUG
              Thread.current[:threach_bail] = true
              raise Threach::ThreachNotMyError.new, "bailing", nil
            end
            
          
            # If the return value is nil, it timed out. See if there's
            # anything wrong, or if we've run out of work
            if obj.nil?
              if outofdata
                Thread.current[:threach_outofdata] = true
                raise Threach::ThreachEndOfRun.new, "out of work", nil
              end
              # otherwise, try to pop again
              next 
            end
            
            # Otherwise, do the work
            blk.call(*obj)
          end
        
        rescue Threach::ThreachNotMyError => e
          print "Thread #{Thread.current[:threach_num]}: Not my error\n" if Threach::DEBUG
          Thread.current[:threach_bail] = true            
          # do nothing; wasn't my error, so I just bailed
        
        rescue Threach::ThreachEndOfRun => e
          print "Thread #{Thread.current[:threach_num]}: End of run\n" if Threach::DEBUG
          Thread.current[:threach_bail] = true            
          # do nothing; everything exited normally 
          
        rescue Exception => e
          print "Thread #{Thread.current[:threach_num]}: Exception #{e.inspect}: #{e.message}\n" if Threach::DEBUG
          # Some other error; let everyone else know
          bail = true
          Thread.current[:threach_bail]
          errorq.push e
        ensure
          # OK, I don't understand this, but I'm unable to catch org.jruby.exceptions.JumpException$BreakJump
          # But if I get here and nothing else is set, that means I broke and need to deal with
          # it accordingly
          unless Thread.current[:threach_bail] or Thread.current[:threach_outofdata]
            print "Thread #{Thread.current[:threach_num]}: broke out of loop\n" if Threach::DEBUG
            bail = true
          end
        end
      end
    end
    
    
    # Now, our producer
    
    # Start running the given iterator and try to push stuff
    
    begin
      self.send(iterator) do |*x|
        until successful_push = bq.push(x)
          # if we're in here, we got a timeout. Check for errors
          raise Threach::ThreachNotMyError.new, "bailing", nil if bail
        end
        print "Queued #{x}\n" if Threach::DEBUG
      end

      # We're all done. Let 'em know
      print "Setting outofdata to true\n" if Threach::DEBUG
      outofdata = true
    
    rescue NativeException => e
      print "Producer rescuing native exception #{e.inspect}" if Threach::DEBUG
      bail = true
    
    rescue Threach::ThreachNotMyError => e
      print "Producer: not my error\n" if Threach::DEBUG
      # do nothing. Not my error
      
    rescue Exception => e
      print "Producer: exception\n" if Threach::DEBUG
      bail = true
      errorq.push e
    end
    
    # Finally, #join the consumers
    
    consumers.each {|t| t.join}
    
    # Everything's done. If there's an error on the stack, raise it
     if e = errorq.peek
       print "Producer: raising #{e.inspect}\n" if Threach::DEBUG
       raise e, e.message, nil
     end
    

  end
end