Class: Polyphony::ThreadPool

Inherits:
Object
  • Object
show all
Defined in:
lib/polyphony/core/thread_pool.rb

Overview

Implements a pool of threads

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size = Etc.nprocessors) ⇒ ThreadPool

Initializes the thread pool. The pool size defaults to the number of available CPU cores.

Parameters:

  • size (Integer) (defaults to: Etc.nprocessors)

    number of threads in pool



33
34
35
36
37
# File 'lib/polyphony/core/thread_pool.rb', line 33

def initialize(size = Etc.nprocessors)
  @size = size
  @task_queue = Polyphony::Queue.new
  @threads = (1..@size).map { Thread.new { thread_loop } }
end

Instance Attribute Details

#sizeObject (readonly)

The pool size.



9
10
11
# File 'lib/polyphony/core/thread_pool.rb', line 9

def size
  @size
end

Class Method Details

.process(&block) ⇒ any

Runs the given block on an available thread from the default thread pool.

Returns:

  • (any)

    return value of given block



14
15
16
17
# File 'lib/polyphony/core/thread_pool.rb', line 14

def self.process(&block)
  @default_pool ||= new
  @default_pool.process(&block)
end

.resetnil

Resets the default thread pool.

Returns:

  • (nil)


22
23
24
25
26
27
# File 'lib/polyphony/core/thread_pool.rb', line 22

def self.reset
  return unless @default_pool

  @default_pool.stop
  @default_pool = nil
end

Instance Method Details

#busy?bool

Returns true if there are any currently running tasks, or any pending tasks waiting for a thread to become available.

Returns:

  • (bool)

    true if the pool is busy



66
67
68
# File 'lib/polyphony/core/thread_pool.rb', line 66

def busy?
  !@task_queue.empty?
end

#cast(&block) ⇒ Polyphony::ThreadPool

Adds a task to be performed asynchronously on a thread from the pool. This method does not block. The task will be performed once a thread becomes available.

Returns:



55
56
57
58
59
60
# File 'lib/polyphony/core/thread_pool.rb', line 55

def cast(&block)
  setup unless @task_queue

  @task_queue << [block, nil]
  self
end

#process(&block) ⇒ any

Runs the given block on an available thread from the pool.

Returns:

  • (any)

    return value of block



42
43
44
45
46
47
48
# File 'lib/polyphony/core/thread_pool.rb', line 42

def process(&block)
  setup unless @task_queue

  watcher = Fiber.current.auto_watcher
  @task_queue << [block, watcher]
  watcher.await
end

#stopObject

Stops and waits for all threads in the queue to terminate.



71
72
73
74
# File 'lib/polyphony/core/thread_pool.rb', line 71

def stop
  @threads.each(&:kill)
  @threads.each(&:join)
end