Class: NatsWork::ThreadPool
- Inherits:
-
Object
- Object
- NatsWork::ThreadPool
- Defined in:
- lib/natswork/thread_pool.rb
Instance Attribute Summary collapse
-
#max_queue_size ⇒ Object
readonly
Returns the value of attribute max_queue_size.
-
#size ⇒ Object
readonly
Returns the value of attribute size.
Instance Method Summary collapse
- #active_count ⇒ Object
-
#initialize(size: 10, max_queue: nil) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #memory_usage ⇒ Object
- #queue_size ⇒ Object
- #shutdown ⇒ Object
- #shutdown! ⇒ Object
- #stats ⇒ Object
- #submit(&block) ⇒ Object
- #wait_for_termination(timeout: nil) ⇒ Object
Constructor Details
#initialize(size: 10, max_queue: nil) ⇒ ThreadPool
Returns a new instance of ThreadPool.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/natswork/thread_pool.rb', line 12 def initialize(size: 10, max_queue: nil) @size = size @max_queue_size = max_queue # Use SizedQueue for limited queue, regular Queue otherwise if max_queue&.positive? @queue = SizedQueue.new(max_queue) @limited = true else @queue = Queue.new @limited = false end @workers = [] @shutdown = false @mutex = Mutex.new @active_count = Concurrent::AtomicFixnum.new(0) @completed_count = Concurrent::AtomicFixnum.new(0) @rejected_count = Concurrent::AtomicFixnum.new(0) start_workers end |
Instance Attribute Details
#max_queue_size ⇒ Object (readonly)
Returns the value of attribute max_queue_size.
10 11 12 |
# File 'lib/natswork/thread_pool.rb', line 10 def max_queue_size @max_queue_size end |
#size ⇒ Object (readonly)
Returns the value of attribute size.
10 11 12 |
# File 'lib/natswork/thread_pool.rb', line 10 def size @size end |
Instance Method Details
#active_count ⇒ Object
63 64 65 |
# File 'lib/natswork/thread_pool.rb', line 63 def active_count @active_count.value end |
#memory_usage ⇒ Object
133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/natswork/thread_pool.rb', line 133 def memory_usage # Get process memory usage if defined?(GetProcessMem) GetProcessMem.new.bytes else # Fallback to RSS from /proc (Linux) begin File.read("/proc/#{Process.pid}/status").match(/VmRSS:\s+(\d+)/)[1].to_i * 1024 rescue StandardError # Fallback for non-Linux or if reading fails 0 end end end |
#queue_size ⇒ Object
67 68 69 |
# File 'lib/natswork/thread_pool.rb', line 67 def queue_size @queue.size end |
#shutdown ⇒ Object
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/natswork/thread_pool.rb', line 71 def shutdown @mutex.synchronize do return if @shutdown @shutdown = true end # Don't push terminate signals yet - let existing tasks complete # The workers will check @shutdown flag end |
#shutdown! ⇒ Object
82 83 84 85 86 87 88 89 90 |
# File 'lib/natswork/thread_pool.rb', line 82 def shutdown! @mutex.synchronize do @shutdown = true end # Clear the queue and terminate immediately @queue.clear @workers.each(&:kill) end |
#stats ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 |
# File 'lib/natswork/thread_pool.rb', line 120 def stats { size: @size, active: active_count, queued: queue_size, max_queue: @max_queue_size, completed: @completed_count.value, rejected: @rejected_count.value, shutdown: @shutdown, memory_usage: memory_usage } end |
#submit(&block) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/natswork/thread_pool.rb', line 36 def submit(&block) raise ThreadPoolError, 'Pool is shutting down' if @shutdown if @limited # For SizedQueue, use non-blocking push # Try non-blocking push success = false begin @queue.push(block, true) success = true rescue ThreadError # Queue is full success = false end unless success @rejected_count.increment raise ThreadPoolError, "Queue is full (size: #{@max_queue_size})" end else # Regular queue, just add @queue.push(block) end end |
#wait_for_termination(timeout: nil) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/natswork/thread_pool.rb', line 92 def wait_for_termination(timeout: nil) if timeout deadline = Time.now + timeout # Wait for queue to empty and active tasks to complete sleep 0.01 while (@queue.size.positive? || @active_count.value.positive?) && Time.now < deadline # Then wait for workers to finish @workers.each do |worker| remaining = deadline - Time.now return false if remaining <= 0 joined = worker.join(remaining) return false unless joined end # Check if all tasks completed @queue.empty? && @active_count.value.zero? else # Wait indefinitely for queue to empty and active tasks to complete sleep 0.01 while @queue.size.positive? || @active_count.value.positive? # Then join all workers @workers.each(&:join) true end end |