Class: BetterCap::Proxy::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/bettercap/proxy/thread_pool.rb

Overview

Thread pool class used by the BetterCap::Proxy::*.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max, *extra, &block) ⇒ ThreadPool

Maintain a minimum of min and maximum of max threads in the pool.

The block passed is the work that will be performed in each thread.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/bettercap/proxy/thread_pool.rb', line 24

def initialize(min, max, *extra, &block)
  @not_empty = ConditionVariable.new
  @not_full = ConditionVariable.new
  @mutex = Mutex.new

  @todo = []

  @spawned = 0
  @waiting = 0

  @min = Integer(min)
  @max = Integer(max)
  @block = block
  @extra = extra

  @shutdown = false

  @trim_requested = 0

  @workers = []

  @mutex.synchronize do
    @min.times { spawn_thread }
  end
end

Instance Attribute Details

#spawnedObject (readonly)

Number of spawned threads in the pool.



51
52
53
# File 'lib/bettercap/proxy/thread_pool.rb', line 51

def spawned
  @spawned
end

Instance Method Details

#<<(work) ⇒ Object

Add work to the todo list for a Thread to pickup and process.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/bettercap/proxy/thread_pool.rb', line 124

def <<(work)
  @mutex.synchronize do
    if @shutdown
      raise "Unable to add work while shutting down"
    end

    @todo << work

    if @waiting < @todo.size and @spawned < @max
      spawn_thread
    end

    @not_empty.signal
  end
end

#backlogObject

How many objects have yet to be processed by the pool?



55
56
57
# File 'lib/bettercap/proxy/thread_pool.rb', line 55

def backlog
  @mutex.synchronize { @todo.size }
end

#reapObject

If there are dead threads in the pool make them go away while decreasing spawned counter so that new healthy threads could be created again.



163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/bettercap/proxy/thread_pool.rb', line 163

def reap
  @mutex.synchronize do
    dead_workers = @workers.reject(&:alive?)

    dead_workers.each do |worker|
      worker.kill
      @spawned -= 1
    end

    @workers -= dead_workers
  end
end

#shutdown(join_threads = true) ⇒ Object

Tell all threads in the pool to exit and wait for them to finish.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/bettercap/proxy/thread_pool.rb', line 178

def shutdown( join_threads = true )
  threads = @mutex.synchronize do
    @shutdown = true
    @not_empty.broadcast
    @not_full.broadcast
    # dup workers so that we join them all safely
    @workers.dup
  end

  threads.each(&:join) if join_threads

  @spawned = 0
  @workers = []
end

#trim(force = false) ⇒ Object

If too many threads are in the pool, tell one to finish go ahead and exit. If force is true, then a trim request is requested even if all threads are being utilized.



152
153
154
155
156
157
158
159
# File 'lib/bettercap/proxy/thread_pool.rb', line 152

def trim(force=false)
  @mutex.synchronize do
    if (force or @waiting > 0) and @spawned - @trim_requested > @min
      @trim_requested += 1
      @not_empty.signal
    end
  end
end

#wait_until_not_fullObject



140
141
142
143
144
145
146
# File 'lib/bettercap/proxy/thread_pool.rb', line 140

def wait_until_not_full
  @mutex.synchronize do
    until @todo.size - @waiting < @max - @spawned or @shutdown
      @not_full.wait @mutex
    end
  end
end