Class: Rutty::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/rutty/thread_pool/pool.rb

Overview

A thread pool implementation using the work queue pattern. Shamelessly stolen from a StackOverflow question.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(max_size = 10) ⇒ ThreadPool

Creates a new Rutty::ThreadPool instance with a maximum number of Worker threads.

Since:

  • 2.4.0



18
19
20
21
22
23
24
# File 'lib/rutty/thread_pool/pool.rb', line 18

def initialize max_size = 10
  Thread.abort_on_exception = true
  
  @max_size = max_size
  @queue = Queue.new
  @workers = []
end

Instance Attribute Details

#max_sizeInteger

Returns The maximum number of concurrent Worker threads.

Returns:

  • (Integer)

    The maximum number of concurrent Worker threads

Since:

  • 2.4.0



14
15
16
# File 'lib/rutty/thread_pool/pool.rb', line 14

def max_size
  @max_size
end

Instance Method Details

#busy?Boolean

Whether any Worker threads are currently executing, determined by comparing the sizes of the @workers array and the @queue.

Returns:

  • (Boolean)

    Any currently executing workers?

Since:

  • 2.4.0



39
40
41
# File 'lib/rutty/thread_pool/pool.rb', line 39

def busy?
  @queue.size < current_size
end

#current_sizeInteger

The current size of the @workers array.

Returns:

  • (Integer)

    The current number of Worker threads

Since:

  • 2.4.0



30
31
32
# File 'lib/rutty/thread_pool/pool.rb', line 30

def current_size
  @workers.size
end

#get_workerRutty::Worker (private)

Gets a free Worker. If the #max_size has been reached and the @queue isn’t empty, pops an existing Worker from there. Otherwise spins up a new Worker and adds it to the @workers array.

Returns:

Since:

  • 2.4.0



70
71
72
73
74
75
76
77
78
# File 'lib/rutty/thread_pool/pool.rb', line 70

def get_worker
  if !@queue.empty? or current_size == @max_size
    return @queue.pop
  else
    worker = Rutty::Worker.new @queue
    @workers << worker
    worker
  end
end

#process(block = nil, &blk) ⇒ void

Gets a free worker and passes it the specified closure.

Parameters:

  • block (Proc, Lambda, #call) (defaults to: nil)

    An instance of any class that responds to #call

See Also:

Since:

  • 2.4.0



57
58
59
60
# File 'lib/rutty/thread_pool/pool.rb', line 57

def process block = nil, &blk
  block = blk if block_given?
  get_worker.set_block block
end

#shutdownvoid Also known as: join

Loops over every Worker and calls Worker#stop, then clears the @workers array.

Since:

  • 2.4.0



45
46
47
48
# File 'lib/rutty/thread_pool/pool.rb', line 45

def shutdown
  @workers.each { |w| w.stop }
  @workers = []
end