Module: Enumerable
- Included in:
- Threach::MultiEnum
- Defined in:
- lib/jruby_threach.rb
Overview
Enumerable is monkey-patched to provide two new methods: #threach and #mthreach.
Instance Method Summary collapse
-
#mthreach(pthreads = nil, threads = 0, iterator = :each, &blk) ⇒ Object
Build up a MultiEnum from the calling object and run threach against it.
-
#threach(threads = 0, iterator = :each, &blk) ⇒ Object
Run the passed block using the given iterator using the given number of threads.
Instance Method Details
#mthreach(pthreads = nil, threads = 0, iterator = :each, &blk) ⇒ Object
Build up a MultiEnum from the calling object and run threach against it
141 142 143 144 |
# File 'lib/jruby_threach.rb', line 141 def mthreach(pthreads=nil, threads = 0, iterator = :each, &blk) me = Threach::MultiEnum.new(self, pthreads, iterator, threads*3) me.send(:threach, threads, iterator, &blk) end |
#threach(threads = 0, iterator = :each, &blk) ⇒ Object
Run the passed block using the given iterator using the given number of threads. If one of the consumer threads bails for any reason (break, throw an un-rescued error), the whole thing will shut down in an orderly fashion.
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# File 'lib/jruby_threach.rb', line 154 def threach(threads = 0, iterator = :each, &blk) # With no extra threads, just spin up the passed iterator if threads == 0 self.send(iterator, &blk) else # Get a java BlockingQueue for the producer to dump stuff into bq = Threach::Queue.new(threads * 2) # capacity is twice the number of threads # And another to store errors errorq = Threach::Queue.new(threads + 1) # A boolean to let us know if things are going wonky bail = false outofdata = false # Build up a set of consumers consumers = [] threads.times do |i| consumers << Thread.new(i) do |i| Thread.current[:threach_num] = i begin while true obj = bq.pop # Should we be bailing? if bail print "Thread #{Thread.current[:threach_num]}: BAIL!\n" if Threach::DEBUG Thread.current[:threach_bail] = true raise Threach::ThreachNotMyError.new, "bailing", nil end # If the return value is nil, it timed out. See if there's # anything wrong, or if we've run out of work if obj.nil? if outofdata Thread.current[:threach_outofdata] = true raise Threach::ThreachEndOfRun.new, "out of work", nil end # otherwise, try to pop again next end # Otherwise, do the work blk.call(*obj) end rescue Threach::ThreachNotMyError => e print "Thread #{Thread.current[:threach_num]}: Not my error\n" if Threach::DEBUG Thread.current[:threach_bail] = true # do nothing; wasn't my error, so I just bailed rescue Threach::ThreachEndOfRun => e print "Thread #{Thread.current[:threach_num]}: End of run\n" if Threach::DEBUG Thread.current[:threach_bail] = true # do nothing; everything exited normally rescue Exception => e print "Thread #{Thread.current[:threach_num]}: Exception #{e.inspect}: #{e.}\n" if Threach::DEBUG # Some other error; let everyone else know bail = true Thread.current[:threach_bail] errorq.push e ensure # OK, I don't understand this, but I'm unable to catch org.jruby.exceptions.JumpException$BreakJump # But if I get here and nothing else is set, that means I broke and need to deal with # it accordingly unless Thread.current[:threach_bail] or Thread.current[:threach_outofdata] print "Thread #{Thread.current[:threach_num]}: broke out of loop\n" if Threach::DEBUG bail = true end end end end # Now, our producer # Start running the given iterator and try to push stuff begin self.send(iterator) do |*x| until successful_push = bq.push(x) # if we're in here, we got a timeout. Check for errors raise Threach::ThreachNotMyError.new, "bailing", nil if bail end print "Queued #{x}\n" if Threach::DEBUG end # We're all done. Let 'em know print "Setting outofdata to true\n" if Threach::DEBUG outofdata = true rescue NativeException => e print "Producer rescuing native exception #{e.inspect}" if Threach::DEBUG bail = true rescue Threach::ThreachNotMyError => e print "Producer: not my error\n" if Threach::DEBUG # do nothing. Not my error rescue Exception => e print "Producer: exception\n" if Threach::DEBUG bail = true errorq.push e end # Finally, #join the consumers consumers.each {|t| t.join} # Everything's done. If there's an error on the stack, raise it if e = errorq.peek print "Producer: raising #{e.inspect}\n" if Threach::DEBUG raise e, e., nil end end end |