Class: BetterCap::Proxy::ThreadPool
- Inherits:
-
Object
- Object
- BetterCap::Proxy::ThreadPool
- Defined in:
- lib/bettercap/proxy/thread_pool.rb
Overview
Thread pool class used by the BetterCap::Proxy::*.
Instance Attribute Summary collapse
-
#spawned ⇒ Object
readonly
Number of spawned threads in the pool.
Instance Method Summary collapse
-
#<<(work) ⇒ Object
Add
work
to the todo list for a Thread to pickup and process. -
#backlog ⇒ Object
How many objects have yet to be processed by the pool?.
-
#initialize(min, max, *extra, &block) ⇒ ThreadPool
constructor
Maintain a minimum of
min
and maximum ofmax
threads in the pool. -
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
-
#shutdown(join_threads = true) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
-
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit.
- #wait_until_not_full ⇒ Object
Constructor Details
#initialize(min, max, *extra, &block) ⇒ ThreadPool
Maintain a minimum of min
and maximum of max
threads in the pool.
The block passed is the work that will be performed in each thread.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 24 def initialize(min, max, *extra, &block) @not_empty = ConditionVariable.new @not_full = ConditionVariable.new @mutex = Mutex.new @todo = [] @spawned = 0 @waiting = 0 @min = Integer(min) @max = Integer(max) @block = block @extra = extra @shutdown = false @trim_requested = 0 @workers = [] @mutex.synchronize do @min.times { spawn_thread } end end |
Instance Attribute Details
#spawned ⇒ Object (readonly)
Number of spawned threads in the pool.
51 52 53 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 51 def spawned @spawned end |
Instance Method Details
#<<(work) ⇒ Object
Add work
to the todo list for a Thread to pickup and process.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 124 def <<(work) @mutex.synchronize do if @shutdown raise "Unable to add work while shutting down" end @todo << work if @waiting < @todo.size and @spawned < @max spawn_thread end @not_empty.signal end end |
#backlog ⇒ Object
How many objects have yet to be processed by the pool?
55 56 57 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 55 def backlog @mutex.synchronize { @todo.size } end |
#reap ⇒ Object
If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.
163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 163 def reap @mutex.synchronize do dead_workers = @workers.reject(&:alive?) dead_workers.each do |worker| worker.kill @spawned -= 1 end @workers -= dead_workers end end |
#shutdown(join_threads = true) ⇒ Object
Tell all threads in the pool to exit and wait for them to finish.
178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 178 def shutdown( join_threads = true ) threads = @mutex.synchronize do @shutdown = true @not_empty.broadcast @not_full.broadcast # dup workers so that we join them all safely @workers.dup end threads.each(&:join) if join_threads @spawned = 0 @workers = [] end |
#trim(force = false) ⇒ Object
If too many threads are in the pool, tell one to finish go ahead and exit. If force
is true, then a trim request is requested even if all threads are being utilized.
152 153 154 155 156 157 158 159 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 152 def trim(force=false) @mutex.synchronize do if (force or @waiting > 0) and @spawned - @trim_requested > @min @trim_requested += 1 @not_empty.signal end end end |
#wait_until_not_full ⇒ Object
140 141 142 143 144 145 146 |
# File 'lib/bettercap/proxy/thread_pool.rb', line 140 def wait_until_not_full @mutex.synchronize do until @todo.size - @waiting < @max - @spawned or @shutdown @not_full.wait @mutex end end end |