Class: RbEAI::ThreadPool
- Inherits:
-
Object
- Object
- RbEAI::ThreadPool
- Defined in:
- lib/rbeai/PipeTask.rb
Instance Attribute Summary collapse
-
#size ⇒ Object
Returns the value of attribute size.
Instance Method Summary collapse
-
#initialize(size, task, method, inputQueue, resultQueue, bufferQueue, controlQueue) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #start(name) ⇒ Object
- #waitJoinAll ⇒ Object
Constructor Details
#initialize(size, task, method, inputQueue, resultQueue, bufferQueue, controlQueue) ⇒ ThreadPool
Returns a new instance of ThreadPool.
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/rbeai/PipeTask.rb', line 41 def initialize(size, task, method, inputQueue, resultQueue, bufferQueue, controlQueue) @size = size @task = task @method = method @inputQueue = inputQueue @resultQueue = resultQueue @bufferQueue = bufferQueue @controlQueue = controlQueue @threadgroup = ThreadsWait.new end |
Instance Attribute Details
#size ⇒ Object
Returns the value of attribute size.
39 40 41 |
# File 'lib/rbeai/PipeTask.rb', line 39 def size @size end |
Instance Method Details
#start(name) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/rbeai/PipeTask.rb', line 52 def start(name) @size.times do th = Thread.new(@method) do |method| #print "Running #{name} th \n" begin obj = @inputQueue.deq #print "#{name} -> #{obj}\n" if obj != :FINISH_WORK && obj != :FINISH_THREADS if @bufferQueue != nil method.call(@bufferQueue, obj) @resultQueue.enq(obj) else method.call(@resultQueue, obj) end elsif obj == :FINISH_THREADS @controlQueue.enq(:FINISH_THREADS) end end until obj == :FINISH_WORK #print "Finished th \n" end @threadgroup.join_nowait(th) end end |
#waitJoinAll ⇒ Object
76 77 78 79 |
# File 'lib/rbeai/PipeTask.rb', line 76 def waitJoinAll() @threadgroup.all_waits @task.doBuffered(@bufferQueue) if @bufferQueue != nil end |