Class: Thread::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/thread/pool.rb,
lib/thread/future.rb

Overview

A pool is a container of a limited amount of threads to which you can add tasks to run.

This is usually more performant and less memory intensive than creating a new thread for every task.

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max = nil, &block) ⇒ Pool

Create the pool with minimum and maximum threads.

The pool will start with the minimum amount of threads created and will spawn new threads until the max is reached in case of need.

A default block can be passed, which will be used to #process the passed data.



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/thread/pool.rb', line 112

def initialize (min, max = nil, &block)
	@min   = min
	@max   = max || min
	@block = block

	@cond  = ConditionVariable.new
	@mutex = Mutex.new

	@done       = ConditionVariable.new
	@done_mutex = Mutex.new
	
	@todo     = []
	@workers  = []
	@timeouts = {}

	@spawned       = 0
	@waiting       = 0
	@shutdown      = false
	@trim_requests = 0
	@auto_trim     = false
	@idle_trim     = nil

	@mutex.synchronize {
		min.times {
			spawn_thread
		}
	}
end

Instance Attribute Details

#maxObject (readonly)

Returns the value of attribute max.



103
104
105
# File 'lib/thread/pool.rb', line 103

def max
  @max
end

#minObject (readonly)

Returns the value of attribute min.



103
104
105
# File 'lib/thread/pool.rb', line 103

def min
  @min
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



103
104
105
# File 'lib/thread/pool.rb', line 103

def spawned
  @spawned
end

#waitingObject (readonly)

Returns the value of attribute waiting.



103
104
105
# File 'lib/thread/pool.rb', line 103

def waiting
  @waiting
end

Instance Method Details

#auto_trim!Object

Enable auto trimming, unneeded threads will be deleted until the minimum is reached.



151
152
153
# File 'lib/thread/pool.rb', line 151

def auto_trim!
	@auto_trim = true
end

#auto_trim?Boolean

Check if auto trimming is enabled.

Returns:

  • (Boolean)


145
146
147
# File 'lib/thread/pool.rb', line 145

def auto_trim?
	@auto_trim
end

#backlogObject

Get the amount of tasks that still have to be run.



185
186
187
188
189
# File 'lib/thread/pool.rb', line 185

def backlog
	@mutex.synchronize {
		@todo.length
	}
end

#done?Boolean

Are all tasks consumed ?

Returns:

  • (Boolean)


192
193
194
# File 'lib/thread/pool.rb', line 192

def done?
	@todo.empty? and @waiting == @spawned
end

#future(&block) ⇒ Object



161
162
163
# File 'lib/thread/future.rb', line 161

def future (&block)
	Thread.future self, &block
end

#idle(&block) ⇒ Object

Process Block when there is a idle worker if not block its returns



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/thread/pool.rb', line 210

def idle (&block)
        while !idle?
                @done_mutex.synchronize {
                        break if idle?
                        @done.wait @done_mutex
                }
        end

        unless block
                return
        end

        process &block

end

#idle?Boolean

Check if there are idle workers.

Returns:

  • (Boolean)


205
206
207
# File 'lib/thread/pool.rb', line 205

def idle?
       @todo.length < @waiting
end

#idle_trim!(timeout) ⇒ Object

Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. The minimum number of threads is respeced.



167
168
169
# File 'lib/thread/pool.rb', line 167

def idle_trim!(timeout)
	@idle_trim = timeout
end

#idle_trim?Boolean

Check if idle trimming is enabled.

Returns:

  • (Boolean)


161
162
163
# File 'lib/thread/pool.rb', line 161

def idle_trim?
	!@idle_trim.nil?
end

#joinObject

Join on all threads in the pool.



306
307
308
309
310
311
312
313
314
# File 'lib/thread/pool.rb', line 306

def join
	until @workers.empty?
		if worker = @workers.first
			worker.join
		end
	end

	self
end

#no_auto_trim!Object

Disable auto trimming.



156
157
158
# File 'lib/thread/pool.rb', line 156

def no_auto_trim!
	@auto_trim = false
end

#no_idle_trim!Object

Turn of idle trimming.



172
173
174
# File 'lib/thread/pool.rb', line 172

def no_idle_trim!
	@idle_trim = nil
end

#process(*args, &block) ⇒ Object Also known as: <<

Add a task to the pool which will execute the block with the given argument.

If no block is passed the default block will be used if present, an ArgumentError will be raised otherwise.



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
# File 'lib/thread/pool.rb', line 231

def process (*args, &block)
	unless block || @block
		raise ArgumentError, 'you must pass a block'
	end

	task = Task.new(self, *args, &(block || @block))

	@mutex.synchronize {
		raise 'unable to add work while shutting down' if shutdown?

		@todo << task

		if @waiting == 0 && @spawned < @max
			spawn_thread
		end

		@cond.signal
	}

	task
end

#resize(min, max = nil) ⇒ Object

Resize the pool with the passed arguments.



177
178
179
180
181
182
# File 'lib/thread/pool.rb', line 177

def resize (min, max = nil)
	@min = min
	@max = max || min

	trim!
end

#shutdownObject

Shut down the pool, it will block until all tasks have finished running.



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/thread/pool.rb', line 286

def shutdown
	@mutex.synchronize {
		@shutdown = :nicely
		@cond.broadcast
	}

	join

	if @timeout
		@shutdown = :now

		wake_up_timeout

		@timeout.join
	end

	self
end

#shutdown!Object

Shut down the pool instantly without finishing to execute tasks.



274
275
276
277
278
279
280
281
282
283
# File 'lib/thread/pool.rb', line 274

def shutdown!
	@mutex.synchronize {
		@shutdown = :now
		@cond.broadcast
	}

	wake_up_timeout

	self
end

#shutdown?Boolean

Check if the pool has been shut down.

Returns:

  • (Boolean)


142
# File 'lib/thread/pool.rb', line 142

def shutdown?; !!@shutdown; end

#shutdown_after(timeout) ⇒ Object

Shutdown the pool after a given amount of time.



330
331
332
333
334
335
336
337
338
# File 'lib/thread/pool.rb', line 330

def shutdown_after (timeout)
	Thread.new {
		sleep timeout

		shutdown
	}

	self
end

#timeout_for(task, timeout) ⇒ Object

Define a timeout for a task.



317
318
319
320
321
322
323
324
325
326
327
# File 'lib/thread/pool.rb', line 317

def timeout_for (task, timeout)
	unless @timeout
		spawn_timeout_thread
	end

	@mutex.synchronize {
		@timeouts[task] = timeout

		wake_up_timeout
	}
end

#trim(force = false) ⇒ Object

Trim the unused threads, if forced threads will be trimmed even if there are tasks waiting.



257
258
259
260
261
262
263
264
265
266
# File 'lib/thread/pool.rb', line 257

def trim (force = false)
	@mutex.synchronize {
		if (force || @waiting > 0) && @spawned - @trim_requests > @min
			@trim_requests += 1
			@cond.signal
		end
	}

	self
end

#trim!Object

Force ##trim.



269
270
271
# File 'lib/thread/pool.rb', line 269

def trim!
	trim true
end

#wait_doneObject

Wait until all tasks are consumed. The caller will be blocked until then.



197
198
199
200
201
202
# File 'lib/thread/pool.rb', line 197

def wait_done
	@done_mutex.synchronize {
		return if done?
		@done.wait @done_mutex
	}
end