Class: RbbtThreadQueue
- Inherits:
-
Object
- Object
- RbbtThreadQueue
- Defined in:
- lib/rbbt/util/concurrency/threads.rb
Defined Under Namespace
Classes: RbbtThreadQueueWorker
Instance Attribute Summary collapse
-
#block ⇒ Object
Returns the value of attribute block.
-
#done ⇒ Object
Returns the value of attribute done.
-
#mutex ⇒ Object
Returns the value of attribute mutex.
-
#num_threads ⇒ Object
Returns the value of attribute num_threads.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#threads ⇒ Object
Returns the value of attribute threads.
Class Method Summary collapse
Instance Method Summary collapse
- #clean ⇒ Object
- #init(use_mutex = false, &block) ⇒ Object
-
#initialize(num_threads) ⇒ RbbtThreadQueue
constructor
A new instance of RbbtThreadQueue.
- #join ⇒ Object
- #process(e) ⇒ Object
Constructor Details
#initialize(num_threads) ⇒ RbbtThreadQueue
Returns a new instance of RbbtThreadQueue.
37 38 39 40 41 42 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 37 def initialize(num_threads) @num_threads = num_threads @threads = [] @queue = Queue.new @mutex = Mutex.new end |
Instance Attribute Details
#block ⇒ Object
Returns the value of attribute block.
2 3 4 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 2 def block @block end |
#done ⇒ Object
Returns the value of attribute done.
2 3 4 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 2 def done @done end |
#mutex ⇒ Object
Returns the value of attribute mutex.
2 3 4 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 2 def mutex @mutex end |
#num_threads ⇒ Object
Returns the value of attribute num_threads.
2 3 4 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 2 def num_threads @num_threads end |
#queue ⇒ Object
Returns the value of attribute queue.
2 3 4 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 2 def queue @queue end |
#threads ⇒ Object
Returns the value of attribute threads.
2 3 4 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 2 def threads @threads end |
Class Method Details
.each(list, num = 3, &block) ⇒ Object
69 70 71 72 73 74 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 69 def self.each(list, num = 3, &block) q = RbbtThreadQueue.new num q.init(&block) list.each do |elem| q.process elem end q.join end |
Instance Method Details
#clean ⇒ Object
61 62 63 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 61 def clean threads.each{ |t| t.clean }.clear end |
#init(use_mutex = false, &block) ⇒ Object
44 45 46 47 48 49 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 44 def init(use_mutex = false, &block) clean num_threads.times do |i| @threads << RbbtThreadQueueWorker.new(queue, use_mutex ? mutex : nil, &block) end end |
#join ⇒ Object
51 52 53 54 55 56 57 58 59 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 51 def join while queue.length > 0 or queue.num_waiting < @threads.length Thread.pass raise "No worker thread survived" if @threads.empty? and queue.length > 0 end @threads.delete_if{|t| t.alive?} @threads.each{|t| t.raise Aborted } @threads.each{|t| t.join(0.1) } end |
#process(e) ⇒ Object
65 66 67 |
# File 'lib/rbbt/util/concurrency/threads.rb', line 65 def process(e) queue << e end |