Class: Puma::ThreadPool
- Inherits:
-
Object
- Object
- Puma::ThreadPool
- Defined in:
- lib/puma/thread_pool.rb
Overview
Internal Docs for A simple thread pool management object.
Each Puma “worker” has a thread pool to process requests.
First a connection to a client is made in ‘Puma::Server`. It is wrapped in a `Puma::Client` instance and then passed to the `Puma::Reactor` to ensure the whole request is buffered into memory. Once the request is ready, it is passed into a thread pool via the `Puma::ThreadPool#<<` operator where it is stored in a `@todo` array.
Each thread in the pool has an internal loop where it pulls a request from the ‘@todo` array and processes it.
Defined Under Namespace
Classes: Automaton, ForceShutdown
Constant Summary collapse
- SHUTDOWN_GRACE_TIME =
How long, after raising the ForceShutdown of a thread during forced shutdown mode, to wait for the thread to try and finish up its work before leaving the thread to die on the vine.
5
Instance Attribute Summary collapse
- #busy_threads ⇒ Object readonly
- #pool_capacity ⇒ Object readonly
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
-
#trim_requested ⇒ Object
readonly
Returns the value of attribute trim_requested.
-
#waiting ⇒ Object
readonly
Returns the value of attribute waiting.
Class Method Summary collapse
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
work
to the todo list for a Thread to pickup and process. - #auto_reap!(timeout = @reaping_time) ⇒ Object
- #auto_trim!(timeout = @auto_trim_time) ⇒ Object
-
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#initialize(name, options = {}, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
min
and maximum ofmax
threads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks.
-
#trim(force = false) ⇒ Object
If there are any free threads in the pool, tell one to go ahead and exit.
- #wait_for_less_busy_worker(delay_s) ⇒ Object
-
#wait_until_not_full ⇒ Object
This method is used by ‘Puma::Server` to let the server know when the thread pool can pull more requests from the socket and pass to the reactor.
-
#with_force_shutdown ⇒ Object
Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.
- #with_mutex(&block) ⇒ Object
Constructor Details
#initialize(name, options = {}, &block) ⇒ ThreadPool
Maintain a minimum of min
and maximum of max
threads in the pool.
The block passed is the work that will be performed in each thread.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/puma/thread_pool.rb', line 34 def initialize(name, = {}, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @name = name @min = Integer([:min_threads]) @max = Integer([:max_threads]) # Not an 'exposed' option, options[:pool_shutdown_grace_time] is used in CI # to shorten @shutdown_grace_time from SHUTDOWN_GRACE_TIME. Parallel CI # makes stubbing constants difficult. @shutdown_grace_time = Float([:pool_shutdown_grace_time] || SHUTDOWN_GRACE_TIME) @block = block @out_of_band = [:out_of_band] @clean_thread_locals = [:clean_thread_locals] @before_thread_start = [:before_thread_start] @before_thread_exit = [:before_thread_exit] @reaping_time = [:reaping_time] @auto_trim_time = [:auto_trim_time] @shutdown = false @trim_requested = 0 @out_of_band_pending = false @workers = [] @auto_trim = nil @reaper = nil @mutex.synchronize do @min.times do spawn_thread @not_full.wait(@mutex) end end @force_shutdown = false @shutdown_mutex = Mutex.new end |
Instance Attribute Details
#busy_threads ⇒ Object (readonly)
112 113 114 |
# File 'lib/puma/thread_pool.rb', line 112 def busy_threads with_mutex { @spawned - @waiting + @todo.size } end |
#pool_capacity ⇒ Object (readonly)
106 107 108 |
# File 'lib/puma/thread_pool.rb', line 106 def pool_capacity waiting + (@max - spawned) end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
80 81 82 |
# File 'lib/puma/thread_pool.rb', line 80 def spawned @spawned end |
#trim_requested ⇒ Object (readonly)
Returns the value of attribute trim_requested.
80 81 82 |
# File 'lib/puma/thread_pool.rb', line 80 def trim_requested @trim_requested end |
#waiting ⇒ Object (readonly)
Returns the value of attribute waiting.
80 81 82 |
# File 'lib/puma/thread_pool.rb', line 80 def waiting @waiting end |
Class Method Details
.clean_thread_locals ⇒ Object
82 83 84 85 86 |
# File 'lib/puma/thread_pool.rb', line 82 def self.clean_thread_locals Thread.current.keys.each do |key| # rubocop: disable Style/HashEachMethods Thread.current[key] = nil unless key == :__recursive_key__ end end |
Instance Method Details
#<<(work) ⇒ Object
Add work
to the todo list for a Thread to pickup and process.
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/puma/thread_pool.rb', line 234 def <<(work) with_mutex do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#auto_reap!(timeout = @reaping_time) ⇒ Object
376 377 378 379 |
# File 'lib/puma/thread_pool.rb', line 376 def auto_reap!(timeout=@reaping_time) @reaper = Automaton.new(self, timeout, "#{@name} threadpool reaper", :reap) @reaper.start! end |
#auto_trim!(timeout = @auto_trim_time) ⇒ Object
371 372 373 374 |
# File 'lib/puma/thread_pool.rb', line 371 def auto_trim!(timeout=@auto_trim_time) @auto_trim = Automaton.new(self, timeout, "#{@name} threadpool trimmer", :trim) @auto_trim.start! end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
101 102 103 |
# File 'lib/puma/thread_pool.rb', line 101 def backlog with_mutex { @todo.size } end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
# File 'lib/puma/thread_pool.rb', line 329 def reap with_mutex do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers.delete_if do |w| dead_workers.include?(w) end end end |
#shutdown(timeout = -1)) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish. Wait timeout
seconds then raise ForceShutdown
in remaining threads. Next, wait an extra @shutdown_grace_time seconds then force-kill remaining threads. Finally, wait 1 second for remaining threads to exit.
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/puma/thread_pool.rb', line 399 def shutdown(timeout=-1) threads = with_mutex do @shutdown = true @trim_requested = @spawned @not_empty.broadcast @not_full.broadcast @auto_trim&.stop @reaper&.stop # dup workers so that we join them all safely @workers.dup end if timeout == -1 # Wait for threads to finish without force shutdown. threads.each(&:join) else join = ->(inner_timeout) do start = Process.clock_gettime(Process::CLOCK_MONOTONIC) threads.reject! do |t| elapsed = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start t.join inner_timeout - elapsed end end # Wait +timeout+ seconds for threads to finish. join.call(timeout) # If threads are still running, raise ForceShutdown and wait to finish. @shutdown_mutex.synchronize do @force_shutdown = true threads.each do |t| t.raise ForceShutdown if t[:with_force_shutdown] end end join.call(@shutdown_grace_time) # If threads are _still_ running, forcefully kill them and wait to finish. threads.each(&:kill) join.call(1) end @spawned = 0 @workers = [] end |
#stats ⇒ Hash
generate stats hash so as not to perform multiple locks
90 91 92 93 94 95 96 97 |
# File 'lib/puma/thread_pool.rb', line 90 def stats with_mutex do { backlog: @todo.size, running: @spawned, pool_capacity: @waiting + (@max - @spawned) } end end |
#trim(force = false) ⇒ Object
If there are any free threads in the pool, tell one to go ahead and exit. If force
is true, then a trim request is requested even if all threads are being utilized.
317 318 319 320 321 322 323 324 325 |
# File 'lib/puma/thread_pool.rb', line 317 def trim(force=false) with_mutex do free = @waiting - @todo.size if (force or free > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_for_less_busy_worker(delay_s) ⇒ Object
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 |
# File 'lib/puma/thread_pool.rb', line 293 def wait_for_less_busy_worker(delay_s) return unless delay_s && delay_s > 0 # Ruby MRI does GVL, this can result # in processing contention when multiple threads # (requests) are running concurrently return unless Puma.mri? with_mutex do return if @shutdown # do not delay, if we are not busy return unless busy_threads > 0 # this will be signaled once a request finishes, # which can happen earlier than delay @not_full.wait @mutex, delay_s end end |
#wait_until_not_full ⇒ Object
This method is used by ‘Puma::Server` to let the server know when the thread pool can pull more requests from the socket and pass to the reactor.
The general idea is that the thread pool can only work on a fixed number of requests at the same time. If it is already processing that number of requests then it is at capacity. If another Puma process has spare capacity, then the request can be left on the socket so the other worker can pick it up and process it.
For example: if there are 5 threads, but only 4 working on requests, this method will not wait and the ‘Puma::Server` can pull a request right away.
If there are 5 threads and all 5 of them are busy, then it will pause here, and wait until the ‘not_full` condition variable is signaled, usually this indicates that a request has been processed.
It’s important to note that even though the server might accept another request, it might not be added to the ‘@todo` array right away. For example if a slow client has only sent a header, but not a body then the `@todo` array would stay the same size as the reactor works to try to buffer the request. In that scenario the next call to this method would not block and another request would be added into the reactor by the server. This would continue until a fully buffered request makes it through the reactor and can then be processed by the thread pool.
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/puma/thread_pool.rb', line 276 def wait_until_not_full with_mutex do while true return if @shutdown # If we can still spin up new threads and there # is work queued that cannot be handled by waiting # threads, then accept more work until we would # spin up the max number of threads. return if busy_threads < @max @not_full.wait @mutex end end end |
#with_force_shutdown ⇒ Object
Allows ThreadPool::ForceShutdown to be raised within the provided block if the thread is forced to shutdown during execution.
383 384 385 386 387 388 389 390 391 392 |
# File 'lib/puma/thread_pool.rb', line 383 def with_force_shutdown t = Thread.current @shutdown_mutex.synchronize do raise ForceShutdown if @force_shutdown t[:with_force_shutdown] = true end yield ensure t[:with_force_shutdown] = false end |
#with_mutex(&block) ⇒ Object
227 228 229 230 231 |
# File 'lib/puma/thread_pool.rb', line 227 def with_mutex(&block) @mutex.owned? ? yield : @mutex.synchronize(&block) end |