Class: Simrpc::ThreadPool
- Inherits:
-
Object
- Object
- Simrpc::ThreadPool
- Defined in:
- lib/simrpc/thread_pool.rb
Overview
Launches a specified number of threads on instantiation, assigning work to them as it arrives
Instance Attribute Summary collapse
-
#terminate ⇒ Object
Returns the value of attribute terminate.
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add work to the pool.
-
#initialize(num_threads, args = {}) ⇒ ThreadPool
constructor
Create a thread pool with a specified number of threads.
-
#next_job ⇒ Object
Return the next job queued up, blocking until one is received if none are present.
-
#stop ⇒ Object
Terminate the thread pool.
Constructor Details
#initialize(num_threads, args = {}) ⇒ ThreadPool
Create a thread pool with a specified number of threads
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/simrpc/thread_pool.rb', line 76 def initialize(num_threads, args = {}) @num_threads = num_threads @timeout = args[:timeout] @job_runners = [] @work_queue = [] @work_queue_lock = Mutex.new @work_queue_cv = ConditionVariable.new @terminate = false 0.upto(@num_threads) { |i| runner = ThreadPoolJobRunner.new(self) @job_runners << runner runner.run } # optional timeout thread unless @timeout.nil? @timeout_thread = Thread.new { until @terminate sleep @timeout @job_runners.each { |jr| if !jr.time_started.nil? && (Time.now - jr.time_started > @timeout) jr.stop jr.run end } end } end end |
Instance Attribute Details
#terminate ⇒ Object
Returns the value of attribute terminate.
73 74 75 |
# File 'lib/simrpc/thread_pool.rb', line 73 def terminate @terminate end |
Instance Method Details
#<<(work) ⇒ Object
Add work to the pool
109 110 111 112 113 114 |
# File 'lib/simrpc/thread_pool.rb', line 109 def <<(work) @work_queue_lock.synchronize { @work_queue << work @work_queue_cv.signal } end |
#next_job ⇒ Object
Return the next job queued up, blocking until one is received if none are present
118 119 120 121 122 123 124 125 126 |
# File 'lib/simrpc/thread_pool.rb', line 118 def next_job work = nil @work_queue_lock.synchronize { # wait until we have work @work_queue_cv.wait(@work_queue_lock) if @work_queue.empty? work = @work_queue.shift } work end |
#stop ⇒ Object
Terminate the thread pool
129 130 131 132 133 134 135 |
# File 'lib/simrpc/thread_pool.rb', line 129 def stop @terminate = true @work_queue_lock.synchronize { @work_queue.clear } @job_runners.each { |jr| jr.stop } end |