Class: Puma::ThreadPool
- Inherits:
-
Object
- Object
- Puma::ThreadPool
- Defined in:
- lib/puma/thread_pool.rb
Overview
A simple thread pool management object.
Defined Under Namespace
Classes: AutoTrim, ForceShutdown, Reaper
Instance Attribute Summary collapse
-
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
-
#trim_requested ⇒ Object
readonly
Returns the value of attribute trim_requested.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
work
to the todo list for a Thread to pickup and process. - #auto_reap!(timeout = 5) ⇒ Object
- #auto_trim!(timeout = 30) ⇒ Object
-
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#initialize(min, max, *extra, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
min
and maximum ofmax
threads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit.
- #wait_until_not_full ⇒ Object
Constructor Details
#initialize(min, max, *extra, &block) ⇒ ThreadPool
Maintain a minimum of min
and maximum of max
threads in the pool.
The block passed is the work that will be performed in each thread.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/puma/thread_pool.rb', line 17 def initialize(min, max, *extra, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @min = Integer(min) @max = Integer(max) @block = block @extra = extra @shutdown = false @trim_requested = 0 @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times { spawn_thread } end @clean_thread_locals = false end |
Instance Attribute Details
#clean_thread_locals ⇒ Object
Returns the value of attribute clean_thread_locals.
49 50 51 |
# File 'lib/puma/thread_pool.rb', line 49 def clean_thread_locals @clean_thread_locals end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
48 49 50 |
# File 'lib/puma/thread_pool.rb', line 48 def spawned @spawned end |
#trim_requested ⇒ Object (readonly)
Returns the value of attribute trim_requested.
48 49 50 |
# File 'lib/puma/thread_pool.rb', line 48 def trim_requested @trim_requested end |
Class Method Details
.clean_thread_locals ⇒ Object
51 52 53 54 55 |
# File 'lib/puma/thread_pool.rb', line 51 def self.clean_thread_locals Thread.current.keys.each do |key| Thread.current[key] = nil unless key == :__recursive_key__ end end |
Instance Method Details
#<<(work) ⇒ Object
Add work
to the todo list for a Thread to pickup and process.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/puma/thread_pool.rb', line 136 def <<(work) @mutex.synchronize do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#auto_reap!(timeout = 5) ⇒ Object
241 242 243 244 |
# File 'lib/puma/thread_pool.rb', line 241 def auto_reap!(timeout=5) @reaper = Reaper.new(self, timeout) @reaper.start! end |
#auto_trim!(timeout = 30) ⇒ Object
212 213 214 215 |
# File 'lib/puma/thread_pool.rb', line 212 def auto_trim!(timeout=30) @auto_trim = AutoTrim.new(self, timeout) @auto_trim.start! end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
59 60 61 |
# File 'lib/puma/thread_pool.rb', line 59 def backlog @mutex.synchronize { @todo.size } end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
175 176 177 178 179 180 181 182 183 184 185 186 |
# File 'lib/puma/thread_pool.rb', line 175 def reap @mutex.synchronize do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers -= dead_workers end end |
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/puma/thread_pool.rb', line 248 def shutdown(timeout=-1) threads = @mutex.synchronize do @shutdown = true @not_empty.broadcast @not_full.broadcast @auto_trim.stop if @auto_trim @reaper.stop if @reaper # dup workers so that we join them all safely @workers.dup end case timeout when -1 threads.each(&:join) when 0 threads.each do |t| t.raise ForceShutdown end threads.each do |t| t.join Const::SHUTDOWN_GRACE_TIME end else timeout.times do threads.delete_if do |t| t.join 1 end if threads.empty? break else sleep 1 end end threads.each do |t| t.raise ForceShutdown end threads.each do |t| t.join Const::SHUTDOWN_GRACE_TIME end end @spawned = 0 @workers = [] end |
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit. If force
is true, then a trim request is requested even if all threads are being utilized.
164 165 166 167 168 169 170 171 |
# File 'lib/puma/thread_pool.rb', line 164 def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_until_not_full ⇒ Object
152 153 154 155 156 157 158 |
# File 'lib/puma/thread_pool.rb', line 152 def wait_until_not_full @mutex.synchronize do until @todo.size - @waiting < @max - @spawned or @shutdown @not_full.wait @mutex end end end |