Class: Salus::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/salus/thread/pool.rb,
lib/salus/thread/future.rb

Overview

A pool is a container of a limited amount of threads to which you can add tasks to run.

This is usually more performant and less memory intensive than creating a new thread for every task.

Defined Under Namespace

Classes: Task

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(min, max = nil, &block) ⇒ ThreadPool

Create the pool with minimum and maximum threads.

The pool will start with the minimum amount of threads created and will spawn new threads until the max is reached in case of need.

A default block can be passed, which will be used to #process the passed data.



133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/salus/thread/pool.rb', line 133

def initialize(min, max = nil, &block)
  @min   = min
  @max   = max || min
  @block = block

  @cond  = ConditionVariable.new
  @mutex = Mutex.new

  @done       = ConditionVariable.new
  @done_mutex = Mutex.new

  @todo     = []
  @workers  = []
  @timeouts = {}

  @spawned       = 0
  @waiting       = 0
  @shutdown      = false
  @trim_requests = 0
  @auto_trim     = false
  @idle_trim     = nil
  @timeout       = nil

  @mutex.synchronize {
    min.times {
      spawn_thread
    }
  }
end

Class Attribute Details

.abort_on_exceptionObject

If true, tasks will allow raised exceptions to pass through.

Similar to Thread.abort_on_exception



356
357
358
# File 'lib/salus/thread/pool.rb', line 356

def abort_on_exception
  @abort_on_exception
end

Instance Attribute Details

#maxObject (readonly)

Returns the value of attribute max.



124
125
126
# File 'lib/salus/thread/pool.rb', line 124

def max
  @max
end

#minObject (readonly)

Returns the value of attribute min.



124
125
126
# File 'lib/salus/thread/pool.rb', line 124

def min
  @min
end

#spawnedObject (readonly)

Returns the value of attribute spawned.



124
125
126
# File 'lib/salus/thread/pool.rb', line 124

def spawned
  @spawned
end

#waitingObject (readonly)

Returns the value of attribute waiting.



124
125
126
# File 'lib/salus/thread/pool.rb', line 124

def waiting
  @waiting
end

Instance Method Details

#auto_trim!Object

Enable auto trimming, unneeded threads will be deleted until the minimum is reached.



175
176
177
178
179
# File 'lib/salus/thread/pool.rb', line 175

def auto_trim!
  @auto_trim = true

  self
end

#auto_trim?Boolean

Check if auto trimming is enabled.

Returns:

  • (Boolean)


169
170
171
# File 'lib/salus/thread/pool.rb', line 169

def auto_trim?
  @auto_trim
end

#backlogObject

Get the amount of tasks that still have to be run.



217
218
219
220
221
# File 'lib/salus/thread/pool.rb', line 217

def backlog
  @mutex.synchronize {
    @todo.length
  }
end

#done?Boolean

Are all tasks consumed?

Returns:

  • (Boolean)


224
225
226
227
228
# File 'lib/salus/thread/pool.rb', line 224

def done?
  @mutex.synchronize {
    _done?
  }
end

#future(&block) ⇒ Object



164
165
166
# File 'lib/salus/thread/future.rb', line 164

def future(&block)
  Future.new(self, &block)
end

#idle?Boolean

Check if there are idle workers.

Returns:

  • (Boolean)


256
257
258
259
260
# File 'lib/salus/thread/pool.rb', line 256

def idle?
  @mutex.synchronize {
    _idle?
  }
end

#idle_trim!(timeout) ⇒ Object

Enable idle trimming. Unneeded threads will be deleted after the given number of seconds of inactivity. The minimum number of threads is respeced.



195
196
197
198
199
# File 'lib/salus/thread/pool.rb', line 195

def idle_trim!(timeout)
  @idle_trim = timeout

  self
end

#idle_trim?Boolean

Check if idle trimming is enabled.

Returns:

  • (Boolean)


189
190
191
# File 'lib/salus/thread/pool.rb', line 189

def idle_trim?
  !@idle_trim.nil?
end

#no_auto_trim!Object

Disable auto trimming.



182
183
184
185
186
# File 'lib/salus/thread/pool.rb', line 182

def no_auto_trim!
  @auto_trim = false

  self
end

#no_idle_trim!Object

Turn of idle trimming.



202
203
204
205
206
# File 'lib/salus/thread/pool.rb', line 202

def no_idle_trim!
  @idle_trim = nil

  self
end

#process(*args, &block) ⇒ Object Also known as: <<

Add a task to the pool which will execute the block with the given argument.

If no block is passed the default block will be used if present, an ArgumentError will be raised otherwise.



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/salus/thread/pool.rb', line 267

def process(*args, &block)
  unless block || @block
    raise ArgumentError, 'you must pass a block'
  end

  task = Task.new(self, *args, &(block || @block))

  @mutex.synchronize {
    raise 'unable to add work while shutting down' if shutdown?

    @todo << task

    if @waiting == 0 && @spawned < @max
      spawn_thread
    end

    @cond.signal
  }

  task
end

#resize(min, max = nil) ⇒ Object

Resize the pool with the passed arguments.



209
210
211
212
213
214
# File 'lib/salus/thread/pool.rb', line 209

def resize(min, max = nil)
  @min = min
  @max = max || min

  trim!
end

#shutdownObject

Shut down the pool, it will block until all tasks have finished running.



322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
# File 'lib/salus/thread/pool.rb', line 322

def shutdown
  @mutex.synchronize {
    @shutdown = :nicely
    @cond.broadcast
  }

  until @workers.empty?
    if worker = @workers.first
      worker.join
    end
  end

  if @timeout
    @shutdown = :now

    wake_up_timeout

    @timeout.join
  end
end

#shutdown!Object

Shut down the pool instantly without finishing to execute tasks.



310
311
312
313
314
315
316
317
318
319
# File 'lib/salus/thread/pool.rb', line 310

def shutdown!
  @mutex.synchronize {
    @shutdown = :now
    @cond.broadcast
  }

  wake_up_timeout

  self
end

#shutdown?Boolean

Check if the pool has been shut down.

Returns:

  • (Boolean)


164
165
166
# File 'lib/salus/thread/pool.rb', line 164

def shutdown?
  !!@shutdown
end

#shutdown_after(timeout) ⇒ Object

Shutdown the pool after a given amount of time.



344
345
346
347
348
349
350
# File 'lib/salus/thread/pool.rb', line 344

def shutdown_after(timeout)
  Thread.new {
    sleep timeout

    shutdown
  }
end

#trim(force = false) ⇒ Object

Trim the unused threads, if forced threads will be trimmed even if there are tasks waiting.



293
294
295
296
297
298
299
300
301
302
# File 'lib/salus/thread/pool.rb', line 293

def trim(force = false)
  @mutex.synchronize {
    if (force || @waiting > 0) && @spawned - @trim_requests > @min
      @trim_requests += 1
      @cond.signal
    end
  }

  self
end

#trim!Object

Force ##trim.



305
306
307
# File 'lib/salus/thread/pool.rb', line 305

def trim!
  trim true
end

#wait(what = :idle) ⇒ Object

Wait until all tasks are consumed. The caller will be blocked until then.



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/salus/thread/pool.rb', line 231

def wait(what = :idle)
  case what
  when :done
    until done?
      @done_mutex.synchronize {
        break if _done?

        @done.wait @done_mutex
      }
    end

  when :idle
    until idle?
      @done_mutex.synchronize {
        break if _idle?

        @done.wait @done_mutex
      }
    end
  end

  self
end