Class: Scheduler::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/scheduler/thread_pool.rb

Overview

ThreadPool manages a pool of worker threads that process tasks from a queue. It maintains a minimum number of threads and can scale up to a maximum number when there’s more work to be done.

Usage:

pool = ThreadPool.new(min_threads: 0, max_threads: 4, idle_time: 0.1)
pool.post { do_something }
pool.stats (returns thread count, busy thread count, etc.)

pool.shutdown (do not accept new tasks)
pool.wait_for_termination(timeout: 1) (optional timeout)

Defined Under Namespace

Classes: ShutdownError

Instance Method Summary collapse

Constructor Details

#initialize(min_threads:, max_threads:, idle_time: nil) ⇒ ThreadPool

Returns a new instance of ThreadPool.

Raises:

  • (ArgumentError)


20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/scheduler/thread_pool.rb', line 20

def initialize(min_threads:, max_threads:, idle_time: nil)
  # 30 seconds is a reasonable default for idle time
  # it is particularly useful for the use case of:
  # ThreadPool.new(min_threads: 4, max_threads: 4)
  # operators would get confused about idle time cause why does it matter
  idle_time ||= 30
  raise ArgumentError, "min_threads must be 0 or larger" if min_threads < 0
  raise ArgumentError, "max_threads must be 1 or larger" if max_threads < 1
  raise ArgumentError, "max_threads must be >= min_threads" if max_threads < min_threads
  raise ArgumentError, "idle_time must be positive" if idle_time <= 0

  @min_threads = min_threads
  @max_threads = max_threads
  @idle_time = idle_time

  @threads = Set.new
  @busy_threads = Set.new

  @queue = Queue.new
  @mutex = Mutex.new
  @new_work = ConditionVariable.new
  @shutdown = false

  # Initialize minimum number of threads
  @min_threads.times { spawn_thread }
end

Instance Method Details

#post(&block) ⇒ Object

Raises:



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/scheduler/thread_pool.rb', line 47

def post(&block)
  raise ShutdownError, "Cannot post work to a shutdown ThreadPool" if shutdown?

  db = RailsMultisite::ConnectionManagement.current_db
  wrapped_block = wrap_block(block, db)

  @mutex.synchronize do
    @queue << wrapped_block
    spawn_thread if @threads.length == 0

    @new_work.signal
  end
end

#shutdownObject



87
88
89
90
91
92
93
94
# File 'lib/scheduler/thread_pool.rb', line 87

def shutdown
  @mutex.synchronize do
    return if @shutdown
    @shutdown = true
    @threads.length.times { @queue << :shutdown }
    @new_work.broadcast
  end
end

#shutdown?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'lib/scheduler/thread_pool.rb', line 96

def shutdown?
  @mutex.synchronize { @shutdown }
end

#statsObject



100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/scheduler/thread_pool.rb', line 100

def stats
  @mutex.synchronize do
    {
      thread_count: @threads.size,
      queued_tasks: @queue.size,
      shutdown: @shutdown,
      min_threads: @min_threads,
      max_threads: @max_threads,
      busy_thread_count: @busy_threads.size,
    }
  end
end

#wait_for_termination(timeout: nil) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/scheduler/thread_pool.rb', line 61

def wait_for_termination(timeout: nil)
  threads_to_join = nil
  @mutex.synchronize { threads_to_join = @threads.to_a }

  if timeout.nil?
    threads_to_join.each(&:join)
  else
    failed_to_shutdown = false

    deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
    threads_to_join.each do |thread|
      remaining_time = deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
      break if remaining_time <= 0
      if !thread.join(remaining_time)
        Rails.logger.error "ThreadPool: Failed to join thread within timeout\n#{thread.backtrace.join("\n")}"
        failed_to_shutdown = true
      end
    end

    if failed_to_shutdown
      @mutex.synchronize { @threads.each(&:kill) }
      raise ShutdownError, "Failed to shutdown ThreadPool within timeout"
    end
  end
end