Class: ThreadPool
- Inherits:
-
Object
- Object
- ThreadPool
- Defined in:
- lib/threadpool.rb
Defined Under Namespace
Classes: Task
Instance Attribute Summary collapse
-
#max ⇒ Object
readonly
Returns the value of attribute max.
-
#min ⇒ Object
readonly
Returns the value of attribute min.
-
#spawned ⇒ Object
readonly
Returns the value of attribute spawned.
Instance Method Summary collapse
- #auto_trim! ⇒ Object
- #auto_trim? ⇒ Boolean
- #backlog ⇒ Object
-
#initialize(min, max = nil, &block) ⇒ ThreadPool
constructor
A new instance of ThreadPool.
- #join ⇒ Object
- #no_auto_trim! ⇒ Object
- #process(*args, &block) ⇒ Object (also: #<<)
- #resize(min, max = nil) ⇒ Object
- #shutdown ⇒ Object
- #shutdown! ⇒ Object
- #shutdown? ⇒ Boolean
- #shutdown_after(timeout) ⇒ Object
- #timeout_for(task, timeout) ⇒ Object
- #trim(force = false) ⇒ Object
- #trim! ⇒ Object
- #wake_up_timeout ⇒ Object
Constructor Details
#initialize(min, max = nil, &block) ⇒ ThreadPool
Returns a new instance of ThreadPool.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/threadpool.rb', line 82 def initialize (min, max = nil, &block) @min = min @max = max || min @block = block @cond = ConditionVariable.new @mutex = Mutex.new @todo = [] @workers = [] @timeouts = {} @spawned = 0 @waiting = 0 @shutdown = false @trim_requests = 0 @auto_trim = false @mutex.synchronize { min.times { spawn_thread } } end |
Instance Attribute Details
#max ⇒ Object (readonly)
Returns the value of attribute max.
80 81 82 |
# File 'lib/threadpool.rb', line 80 def max @max end |
#min ⇒ Object (readonly)
Returns the value of attribute min.
80 81 82 |
# File 'lib/threadpool.rb', line 80 def min @min end |
#spawned ⇒ Object (readonly)
Returns the value of attribute spawned.
80 81 82 |
# File 'lib/threadpool.rb', line 80 def spawned @spawned end |
Instance Method Details
#auto_trim! ⇒ Object
110 |
# File 'lib/threadpool.rb', line 110 def auto_trim!; @auto_trim = true; end |
#auto_trim? ⇒ Boolean
109 |
# File 'lib/threadpool.rb', line 109 def auto_trim?; @auto_trim; end |
#backlog ⇒ Object
120 121 122 123 124 |
# File 'lib/threadpool.rb', line 120 def backlog @mutex.synchronize { @todo.length } end |
#join ⇒ Object
180 181 182 183 184 185 186 |
# File 'lib/threadpool.rb', line 180 def join @timeout.join if @timeout @workers.first.join until @workers.empty? self end |
#no_auto_trim! ⇒ Object
111 |
# File 'lib/threadpool.rb', line 111 def no_auto_trim!; @auto_trim = false; end |
#process(*args, &block) ⇒ Object Also known as: <<
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/threadpool.rb', line 126 def process (*args, &block) unless block || @block raise ArgumentError, 'you must pass a block' end task = Task.new(self, *args, &(block || @block)) @mutex.synchronize { raise 'unable to add work while shutting down' if shutdown? @todo << task if @waiting == 0 && @spawned < @max spawn_thread end @cond.signal } task end |
#resize(min, max = nil) ⇒ Object
113 114 115 116 117 118 |
# File 'lib/threadpool.rb', line 113 def resize (min, max = nil) @min = min @max = max || min trim! end |
#shutdown ⇒ Object
174 175 176 177 178 |
# File 'lib/threadpool.rb', line 174 def shutdown shutdown! join end |
#shutdown! ⇒ Object
165 166 167 168 169 170 171 172 |
# File 'lib/threadpool.rb', line 165 def shutdown! @mutex.synchronize { @shutdown = true @cond.broadcast } wake_up_timeout end |
#shutdown? ⇒ Boolean
107 |
# File 'lib/threadpool.rb', line 107 def shutdown?; @shutdown; end |
#shutdown_after(timeout) ⇒ Object
200 201 202 203 204 205 206 207 208 |
# File 'lib/threadpool.rb', line 200 def shutdown_after (timeout) Thread.new { sleep timeout shutdown } self end |
#timeout_for(task, timeout) ⇒ Object
188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/threadpool.rb', line 188 def timeout_for (task, timeout) unless @timeout spawn_timeout_thread end @mutex.synchronize { @timeouts[task] = timeout wake_up_timeout } end |
#trim(force = false) ⇒ Object
150 151 152 153 154 155 156 157 158 159 |
# File 'lib/threadpool.rb', line 150 def trim (force = false) @mutex.synchronize { if (force || @waiting > 0) && @spawned - @trim_requests > @min @trim_requests -= 1 @cond.signal end } self end |
#trim! ⇒ Object
161 162 163 |
# File 'lib/threadpool.rb', line 161 def trim! trim true end |
#wake_up_timeout ⇒ Object
210 211 212 213 214 |
# File 'lib/threadpool.rb', line 210 def wake_up_timeout if @pipes @pipes.last.write_nonblock 'x' rescue nil end end |