Class: Scheduler::ThreadPool
- Inherits:
-
Object
- Object
- Scheduler::ThreadPool
- Defined in:
- lib/scheduler/thread_pool.rb
Overview
ThreadPool manages a pool of worker threads that process tasks from a queue. It maintains a minimum number of threads and can scale up to a maximum number when there’s more work to be done.
Usage:
pool = ThreadPool.new(min_threads: 0, max_threads: 4, idle_time: 0.1)
pool.post { do_something }
pool.stats (returns thread count, busy thread count, etc.)
pool.shutdown (do not accept new tasks)
pool.wait_for_termination(timeout: 1) (optional timeout)
Defined Under Namespace
Classes: ShutdownError
Instance Method Summary collapse
-
#initialize(min_threads:, max_threads:, idle_time: nil) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #post(&block) ⇒ Object
- #shutdown ⇒ Object
- #shutdown? ⇒ Boolean
- #stats ⇒ Object
- #wait_for_termination(timeout: nil) ⇒ Object
Constructor Details
#initialize(min_threads:, max_threads:, idle_time: nil) ⇒ ThreadPool
Returns a new instance of ThreadPool.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/scheduler/thread_pool.rb', line 20 def initialize(min_threads:, max_threads:, idle_time: nil) # 30 seconds is a reasonable default for idle time # it is particularly useful for the use case of: # ThreadPool.new(min_threads: 4, max_threads: 4) # operators would get confused about idle time cause why does it matter idle_time ||= 30 raise ArgumentError, "min_threads must be 0 or larger" if min_threads < 0 raise ArgumentError, "max_threads must be 1 or larger" if max_threads < 1 raise ArgumentError, "max_threads must be >= min_threads" if max_threads < min_threads raise ArgumentError, "idle_time must be positive" if idle_time <= 0 @min_threads = min_threads @max_threads = max_threads @idle_time = idle_time @threads = Set.new @busy_threads = Set.new @queue = Queue.new @mutex = Mutex.new @new_work = ConditionVariable.new @shutdown = false # Initialize minimum number of threads @min_threads.times { spawn_thread } end |
Instance Method Details
#post(&block) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/scheduler/thread_pool.rb', line 47 def post(&block) raise ShutdownError, "Cannot post work to a shutdown ThreadPool" if shutdown? db = RailsMultisite::ConnectionManagement.current_db wrapped_block = wrap_block(block, db) @mutex.synchronize do @queue << wrapped_block spawn_thread if @threads.length == 0 @new_work.signal end end |
#shutdown ⇒ Object
87 88 89 90 91 92 93 94 |
# File 'lib/scheduler/thread_pool.rb', line 87 def shutdown @mutex.synchronize do return if @shutdown @shutdown = true @threads.length.times { @queue << :shutdown } @new_work.broadcast end end |
#shutdown? ⇒ Boolean
96 97 98 |
# File 'lib/scheduler/thread_pool.rb', line 96 def shutdown? @mutex.synchronize { @shutdown } end |
#stats ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/scheduler/thread_pool.rb', line 100 def stats @mutex.synchronize do { thread_count: @threads.size, queued_tasks: @queue.size, shutdown: @shutdown, min_threads: @min_threads, max_threads: @max_threads, busy_thread_count: @busy_threads.size, } end end |
#wait_for_termination(timeout: nil) ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/scheduler/thread_pool.rb', line 61 def wait_for_termination(timeout: nil) threads_to_join = nil @mutex.synchronize { threads_to_join = @threads.to_a } if timeout.nil? threads_to_join.each(&:join) else failed_to_shutdown = false deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout threads_to_join.each do |thread| remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC) break if remaining_time <= 0 if !thread.join(remaining_time) Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}" failed_to_shutdown = true end end if failed_to_shutdown @mutex.synchronize { @threads.each(&:kill) } raise ShutdownError, "Failed to shutdown ThreadPool within timeout" end end end |