Class: BetterCap::Proxy::ThreadPool
- Inherits:
-
Object
- Object
- BetterCap::Proxy::ThreadPool
- Defined in:
- lib/bettercap/proxy/thread_pool.rb
Overview
Tnx to Puma ThreadPool!
Instance Attribute Summary collapse
-
#spawned ⇒ Object
readonly
Number of spawned threads in the pool.
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
work
to the todo list for a Thread to pickup and process. -
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#initialize(min, max, *extra, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
min
and maximum ofmax
threads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown(join_threads = true) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit.
- #wait_until_not_full ⇒ Object
Constructor Details
#initialize(min, max, *extra, &block) ⇒ ThreadPool
Maintain a minimum of min
and maximum of max
threads in the pool.
The block passed is the work that will be performed in each thread.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 25 def initialize(min, max, *extra, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @min = Integer(min) @max = Integer(max) @block = block @extra = extra @shutdown = false @trim_requested = 0 @workers = [] @mutex.synchronize do @min.times { spawn_thread } end end |
Instance Attribute Details
#spawned ⇒ Object (readonly)
Number of spawned threads in the pool.
52 53 54 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 52 def spawned @spawned end |
Instance Method Details
#<<(work) ⇒ Object
Add work
to the todo list for a Thread to pickup and process.
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 125 def <<(work) @mutex.synchronize do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
56 57 58 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 56 def backlog @mutex.synchronize { @todo.size } end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 164 def reap @mutex.synchronize do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers -= dead_workers end end |
#shutdown(join_threads = true) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 179 def shutdown( join_threads = true ) threads = @mutex.synchronize do @shutdown = true @not_empty.broadcast @not_full.broadcast # dup workers so that we join them all safely @workers.dup end threads.each(&:join) if join_threads @spawned = 0 @workers = [] end |
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit. If force
is true, then a trim request is requested even if all threads are being utilized.
153 154 155 156 157 158 159 160 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 153 def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_until_not_full ⇒ Object
141 142 143 144 145 146 147 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 141 def wait_until_not_full @mutex.synchronize do until @todo.size - @waiting < @max - @spawned or @shutdown @not_full.wait @mutex end end end |