Class: Polyphony::ThreadPool
- Defined in:
- lib/polyphony/core/thread_pool.rb
Overview
Implements a pool of threads
Instance Attribute Summary collapse
-
#size ⇒ Object
readonly
The pool size.
Class Method Summary collapse
-
.process(&block) ⇒ any
Runs the given block on an available thread from the default thread pool.
-
.reset ⇒ nil
Resets the default thread pool.
Instance Method Summary collapse
-
#busy? ⇒ bool
Returns true if there are any currently running tasks, or any pending tasks waiting for a thread to become available.
-
#cast(&block) ⇒ Polyphony::ThreadPool
Adds a task to be performed asynchronously on a thread from the pool.
-
#initialize(size = Etc.nprocessors) ⇒ ThreadPool
constructor
Initializes the thread pool.
-
#process(&block) ⇒ any
Runs the given block on an available thread from the pool.
-
#stop ⇒ Object
Stops and waits for all threads in the queue to terminate.
Constructor Details
#initialize(size = Etc.nprocessors) ⇒ ThreadPool
Initializes the thread pool. The pool size defaults to the number of available CPU cores.
33 34 35 36 37 |
# File 'lib/polyphony/core/thread_pool.rb', line 33 def initialize(size = Etc.nprocessors) @size = size @task_queue = Polyphony::Queue.new @threads = (1..@size).map { Thread.new { thread_loop } } end |
Instance Attribute Details
#size ⇒ Object (readonly)
The pool size.
9 10 11 |
# File 'lib/polyphony/core/thread_pool.rb', line 9 def size @size end |
Class Method Details
.process(&block) ⇒ any
Runs the given block on an available thread from the default thread pool.
14 15 16 17 |
# File 'lib/polyphony/core/thread_pool.rb', line 14 def self.process(&block) @default_pool ||= new @default_pool.process(&block) end |
.reset ⇒ nil
Resets the default thread pool.
22 23 24 25 26 27 |
# File 'lib/polyphony/core/thread_pool.rb', line 22 def self.reset return unless @default_pool @default_pool.stop @default_pool = nil end |
Instance Method Details
#busy? ⇒ bool
Returns true if there are any currently running tasks, or any pending tasks waiting for a thread to become available.
66 67 68 |
# File 'lib/polyphony/core/thread_pool.rb', line 66 def busy? !@task_queue.empty? end |
#cast(&block) ⇒ Polyphony::ThreadPool
Adds a task to be performed asynchronously on a thread from the pool. This method does not block. The task will be performed once a thread becomes available.
55 56 57 58 59 60 |
# File 'lib/polyphony/core/thread_pool.rb', line 55 def cast(&block) setup unless @task_queue @task_queue << [block, nil] self end |
#process(&block) ⇒ any
Runs the given block on an available thread from the pool.
42 43 44 45 46 47 48 |
# File 'lib/polyphony/core/thread_pool.rb', line 42 def process(&block) setup unless @task_queue watcher = Fiber.current.auto_watcher @task_queue << [block, watcher] watcher.await end |
#stop ⇒ Object
Stops and waits for all threads in the queue to terminate.
71 72 73 74 |
# File 'lib/polyphony/core/thread_pool.rb', line 71 def stop @threads.each(&:kill) @threads.each(&:join) end |