Class: NatsWork::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/natswork/thread_pool.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size: 10, max_queue: nil) ⇒ ThreadPool

Returns a new instance of ThreadPool.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/natswork/thread_pool.rb', line 12

def initialize(size: 10, max_queue: nil)
  @size = size
  @max_queue_size = max_queue

  # Use SizedQueue for limited queue, regular Queue otherwise
  if max_queue&.positive?
    @queue = SizedQueue.new(max_queue)
    @limited = true
  else
    @queue = Queue.new
    @limited = false
  end

  @workers = []
  @shutdown = false
  @mutex = Mutex.new

  @active_count = Concurrent::AtomicFixnum.new(0)
  @completed_count = Concurrent::AtomicFixnum.new(0)
  @rejected_count = Concurrent::AtomicFixnum.new(0)

  start_workers
end

Instance Attribute Details

#max_queue_sizeObject (readonly)

Returns the value of attribute max_queue_size.



10
11
12
# File 'lib/natswork/thread_pool.rb', line 10

def max_queue_size
  @max_queue_size
end

#sizeObject (readonly)

Returns the value of attribute size.



10
11
12
# File 'lib/natswork/thread_pool.rb', line 10

def size
  @size
end

Instance Method Details

#active_countObject



63
64
65
# File 'lib/natswork/thread_pool.rb', line 63

def active_count
  @active_count.value
end

#memory_usageObject



133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/natswork/thread_pool.rb', line 133

def memory_usage
  # Get process memory usage
  if defined?(GetProcessMem)
    GetProcessMem.new.bytes
  else
    # Fallback to RSS from /proc (Linux)
    begin
      File.read("/proc/#{Process.pid}/status").match(/VmRSS:\s+(\d+)/)[1].to_i * 1024
    rescue StandardError
      # Fallback for non-Linux or if reading fails
      0
    end
  end
end

#queue_sizeObject



67
68
69
# File 'lib/natswork/thread_pool.rb', line 67

def queue_size
  @queue.size
end

#shutdownObject



71
72
73
74
75
76
77
78
79
80
# File 'lib/natswork/thread_pool.rb', line 71

def shutdown
  @mutex.synchronize do
    return if @shutdown

    @shutdown = true
  end

  # Don't push terminate signals yet - let existing tasks complete
  # The workers will check @shutdown flag
end

#shutdown!Object



82
83
84
85
86
87
88
89
90
# File 'lib/natswork/thread_pool.rb', line 82

def shutdown!
  @mutex.synchronize do
    @shutdown = true
  end

  # Clear the queue and terminate immediately
  @queue.clear
  @workers.each(&:kill)
end

#statsObject



120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/natswork/thread_pool.rb', line 120

def stats
  {
    size: @size,
    active: active_count,
    queued: queue_size,
    max_queue: @max_queue_size,
    completed: @completed_count.value,
    rejected: @rejected_count.value,
    shutdown: @shutdown,
    memory_usage: memory_usage
  }
end

#submit(&block) ⇒ Object

Raises:



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/natswork/thread_pool.rb', line 36

def submit(&block)
  raise ThreadPoolError, 'Pool is shutting down' if @shutdown

  if @limited
    # For SizedQueue, use non-blocking push

    # Try non-blocking push
    success = false
    begin
      @queue.push(block, true)
      success = true
    rescue ThreadError
      # Queue is full
      success = false
    end

    unless success
      @rejected_count.increment
      raise ThreadPoolError, "Queue is full (size: #{@max_queue_size})"
    end

  else
    # Regular queue, just add
    @queue.push(block)
  end
end

#wait_for_termination(timeout: nil) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/natswork/thread_pool.rb', line 92

def wait_for_termination(timeout: nil)
  if timeout
    deadline = Time.now + timeout

    # Wait for queue to empty and active tasks to complete
    sleep 0.01 while (@queue.size.positive? || @active_count.value.positive?) && Time.now < deadline

    # Then wait for workers to finish
    @workers.each do |worker|
      remaining = deadline - Time.now
      return false if remaining <= 0

      joined = worker.join(remaining)
      return false unless joined
    end

    # Check if all tasks completed
    @queue.empty? && @active_count.value.zero?
  else
    # Wait indefinitely for queue to empty and active tasks to complete
    sleep 0.01 while @queue.size.positive? || @active_count.value.positive?

    # Then join all workers
    @workers.each(&:join)
    true
  end
end