Class: Thread::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/thread/pool.rb,
lib/thread/future.rb

Overview

A pool is a container of a limited amount of threads to which you can add tasks to run.

This is usually more performant and less memory intensive than creating a new thread for every task.

Defined Under Namespace

Classes: Task

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max = nil, &block) ⇒ Pool

Create the pool with minimum and maximum threads.

The pool will start with the minimum amount of threads created and will spawn new threads until the max is reached in case of need.

A default block can be passed, which will be used to #process the passed data.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/thread/pool.rb', line 124

def initialize(min, max = nil, &block)
	@min   = min
	@max   = max || min
	@block = block

	@cond  = ConditionVariable.new
	@mutex = Mutex.new

	@done       = ConditionVariable.new
	@done_mutex = Mutex.new

	@todo     = []
	@workers  = []
	@timeouts = {}

	@spawned       = 0
	@waiting       = 0
	@shutdown      = false
	@trim_requests = 0
	@auto_trim     = false
	@idle_trim     = nil

	@mutex.synchronize {
		min.times {
			spawn_thread
		}
	}
end

Class Attribute Details

.abort_on_exceptionObject

If true, tasks will allow raised exceptions to pass through.

Similar to Thread.abort_on_exception



346
347
348
# File 'lib/thread/pool.rb', line 346

def abort_on_exception
  @abort_on_exception
end

Instance Attribute Details

#maxObject (readonly)

Returns the value of attribute max.



115
116
117
# File 'lib/thread/pool.rb', line 115

def max
  @max
end

#minObject (readonly)

Returns the value of attribute min.



115
116
117
# File 'lib/thread/pool.rb', line 115

def min
  @min
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



115
116
117
# File 'lib/thread/pool.rb', line 115

def spawned
  @spawned
end

#waitingObject (readonly)

Returns the value of attribute waiting.



115
116
117
# File 'lib/thread/pool.rb', line 115

def waiting
  @waiting
end

Instance Method Details

#auto_trim!Object

Enable auto trimming, unneeded threads will be deleted until the minimum is reached.



165
166
167
168
169
# File 'lib/thread/pool.rb', line 165

def auto_trim!
	@auto_trim = true

	self
end

#auto_trim?Boolean

Check if auto trimming is enabled.

Returns:

  • (Boolean)


159
160
161
# File 'lib/thread/pool.rb', line 159

def auto_trim?
	@auto_trim
end

#backlogObject

Get the amount of tasks that still have to be run.



207
208
209
210
211
# File 'lib/thread/pool.rb', line 207

def backlog
	@mutex.synchronize {
		@todo.length
	}
end

#done?Boolean

Are all tasks consumed?

Returns:

  • (Boolean)


214
215
216
217
218
# File 'lib/thread/pool.rb', line 214

def done?
	@mutex.synchronize {
		_done?
	}
end

#future(&block) ⇒ Object



164
165
166
# File 'lib/thread/future.rb', line 164

def future(&block)
	Thread.future self, &block
end

#idle?Boolean

Check if there are idle workers.

Returns:

  • (Boolean)


246
247
248
249
250
# File 'lib/thread/pool.rb', line 246

def idle?
	@mutex.synchronize {
		_idle?
	}
end

#idle_trim!(timeout) ⇒ Object

Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. The minimum number of threads is respeced.



185
186
187
188
189
# File 'lib/thread/pool.rb', line 185

def idle_trim!(timeout)
	@idle_trim = timeout

	self
end

#idle_trim?Boolean

Check if idle trimming is enabled.

Returns:

  • (Boolean)


179
180
181
# File 'lib/thread/pool.rb', line 179

def idle_trim?
	!@idle_trim.nil?
end

#no_auto_trim!Object

Disable auto trimming.



172
173
174
175
176
# File 'lib/thread/pool.rb', line 172

def no_auto_trim!
	@auto_trim = false

	self
end

#no_idle_trim!Object

Turn of idle trimming.



192
193
194
195
196
# File 'lib/thread/pool.rb', line 192

def no_idle_trim!
	@idle_trim = nil

	self
end

#process(*args, &block) ⇒ Object Also known as: <<

Add a task to the pool which will execute the block with the given argument.

If no block is passed the default block will be used if present, an ArgumentError will be raised otherwise.



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/thread/pool.rb', line 257

def process(*args, &block)
	unless block || @block
		raise ArgumentError, 'you must pass a block'
	end

	task = Task.new(self, *args, &(block || @block))

	@mutex.synchronize {
		raise 'unable to add work while shutting down' if shutdown?

		@todo << task

		if @waiting == 0 && @spawned < @max
			spawn_thread
		end

		@cond.signal
	}

	task
end

#resize(min, max = nil) ⇒ Object

Resize the pool with the passed arguments.



199
200
201
202
203
204
# File 'lib/thread/pool.rb', line 199

def resize(min, max = nil)
	@min = min
	@max = max || min

	trim!
end

#shutdownObject

Shut down the pool, it will block until all tasks have finished running.



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/thread/pool.rb', line 312

def shutdown
	@mutex.synchronize {
		@shutdown = :nicely
		@cond.broadcast
	}

	until @workers.empty?
		if worker = @workers.first
			worker.join
		end
	end

	if @timeout
		@shutdown = :now

		wake_up_timeout

		@timeout.join
	end
end

#shutdown!Object

Shut down the pool instantly without finishing to execute tasks.



300
301
302
303
304
305
306
307
308
309
# File 'lib/thread/pool.rb', line 300

def shutdown!
	@mutex.synchronize {
		@shutdown = :now
		@cond.broadcast
	}

	wake_up_timeout

	self
end

#shutdown?Boolean

Check if the pool has been shut down.

Returns:

  • (Boolean)


154
155
156
# File 'lib/thread/pool.rb', line 154

def shutdown?
	!!@shutdown
end

#shutdown_after(timeout) ⇒ Object

Shutdown the pool after a given amount of time.



334
335
336
337
338
339
340
# File 'lib/thread/pool.rb', line 334

def shutdown_after(timeout)
	Thread.new {
		sleep timeout

		shutdown
	}
end

#trim(force = false) ⇒ Object

Trim the unused threads, if forced threads will be trimmed even if there are tasks waiting.



283
284
285
286
287
288
289
290
291
292
# File 'lib/thread/pool.rb', line 283

def trim(force = false)
	@mutex.synchronize {
		if (force || @waiting > 0) && @spawned - @trim_requests > @min
			@trim_requests += 1
			@cond.signal
		end
	}

	self
end

#trim!Object

Force ##trim.



295
296
297
# File 'lib/thread/pool.rb', line 295

def trim!
	trim true
end

#wait(what = :idle) ⇒ Object

Wait until all tasks are consumed. The caller will be blocked until then.



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/thread/pool.rb', line 221

def wait(what = :idle)
	case what
	when :done
		until done?
			@done_mutex.synchronize {
				break if _done?

				@done.wait @done_mutex
			}
		end

	when :idle
		until idle?
			@done_mutex.synchronize {
				break if _idle?

				@done.wait @done_mutex
			}
		end
	end

	self
end