Class: ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/threadpool.rb

Defined Under Namespace

Classes: Task

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max = nil, &block) ⇒ ThreadPool

Returns a new instance of ThreadPool.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/threadpool.rb', line 82

def initialize (min, max = nil, &block)
	@min   = min
	@max   = max || min
	@block = block

	@cond  = ConditionVariable.new
	@mutex = Mutex.new

	@todo     = []
	@workers  = []
	@timeouts = {}

	@spawned       = 0
	@waiting       = 0
	@shutdown      = false
	@trim_requests = 0
	@auto_trim     = false

	@mutex.synchronize {
		min.times {
			spawn_thread
		}
	}
end

Instance Attribute Details

#maxObject (readonly)

Returns the value of attribute max.



80
81
82
# File 'lib/threadpool.rb', line 80

def max
  @max
end

#minObject (readonly)

Returns the value of attribute min.



80
81
82
# File 'lib/threadpool.rb', line 80

def min
  @min
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



80
81
82
# File 'lib/threadpool.rb', line 80

def spawned
  @spawned
end

Instance Method Details

#auto_trim!Object



110
# File 'lib/threadpool.rb', line 110

def auto_trim!;    @auto_trim = true;  end

#auto_trim?Boolean

Returns:

  • (Boolean)


109
# File 'lib/threadpool.rb', line 109

def auto_trim?;    @auto_trim;         end

#backlogObject



120
121
122
123
124
# File 'lib/threadpool.rb', line 120

def backlog
	@mutex.synchronize {
		@todo.length
	}
end

#joinObject



180
181
182
183
184
185
186
# File 'lib/threadpool.rb', line 180

def join
	@timeout.join if @timeout

	@workers.first.join until @workers.empty?

	self
end

#no_auto_trim!Object



111
# File 'lib/threadpool.rb', line 111

def no_auto_trim!; @auto_trim = false; end

#process(*args, &block) ⇒ Object Also known as: <<



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/threadpool.rb', line 126

def process (*args, &block)
	unless block || @block
		raise ArgumentError, 'you must pass a block'
	end

	task = Task.new(self, *args, &(block || @block))

	@mutex.synchronize {
		raise 'unable to add work while shutting down' if shutdown?

		@todo << task

		if @waiting == 0 && @spawned < @max
			spawn_thread
		end

		@cond.signal
	}

	task
end

#resize(min, max = nil) ⇒ Object



113
114
115
116
117
118
# File 'lib/threadpool.rb', line 113

def resize (min, max = nil)
	@min = min
	@max = max || min

	trim!
end

#shutdownObject



174
175
176
177
178
# File 'lib/threadpool.rb', line 174

def shutdown
	shutdown!

	join
end

#shutdown!Object



165
166
167
168
169
170
171
172
# File 'lib/threadpool.rb', line 165

def shutdown!
	@mutex.synchronize {
		@shutdown = true
		@cond.broadcast
	}

	wake_up_timeout
end

#shutdown?Boolean

Returns:

  • (Boolean)


107
# File 'lib/threadpool.rb', line 107

def shutdown?; @shutdown; end

#shutdown_after(timeout) ⇒ Object



200
201
202
203
204
205
206
207
208
# File 'lib/threadpool.rb', line 200

def shutdown_after (timeout)
	Thread.new {
		sleep timeout

		shutdown
	}

	self
end

#timeout_for(task, timeout) ⇒ Object



188
189
190
191
192
193
194
195
196
197
198
# File 'lib/threadpool.rb', line 188

def timeout_for (task, timeout)
	unless @timeout
		spawn_timeout_thread
	end

	@mutex.synchronize {
		@timeouts[task] = timeout

		wake_up_timeout
	}
end

#trim(force = false) ⇒ Object



150
151
152
153
154
155
156
157
158
159
# File 'lib/threadpool.rb', line 150

def trim (force = false)
	@mutex.synchronize {
		if (force || @waiting > 0) && @spawned - @trim_requests > @min
			@trim_requests -= 1
			@cond.signal
		end
	}

	self
end

#trim!Object



161
162
163
# File 'lib/threadpool.rb', line 161

def trim!
	trim true
end

#wake_up_timeoutObject



210
211
212
213
214
# File 'lib/threadpool.rb', line 210

def wake_up_timeout
	if @pipes
		@pipes.last.write_nonblock 'x' rescue nil
	end
end