Class: Temporalio::Worker::ActivityExecutor::ThreadPool
- Inherits:
-
Temporalio::Worker::ActivityExecutor
- Object
- Temporalio::Worker::ActivityExecutor
- Temporalio::Worker::ActivityExecutor::ThreadPool
- Defined in:
- lib/temporalio/worker/activity_executor/thread_pool.rb
Overview
Activity executor for scheduling activities in their own thread. This implementation is a stripped down form of Concurrent Ruby’s ‘CachedThreadPool`.
Class Method Summary collapse
-
.default ⇒ ThreadPool
Default/shared thread pool executor instance with unlimited max threads.
Instance Method Summary collapse
-
#active_count ⇒ Integer
The number of threads that are actively executing tasks.
- #activity_context ⇒ Object
-
#completed_task_count ⇒ Integer
The number of tasks that have been completed by the pool since construction.
- #execute_activity(_defn, &block) ⇒ Object
-
#initialize(max_threads: nil, idle_timeout: 20) ⇒ ThreadPool
constructor
Create a new thread pool executor that creates threads as needed.
-
#kill ⇒ Object
Kill each thread.
-
#largest_length ⇒ Integer
The largest number of threads that have been created in the pool since construction.
-
#length ⇒ Integer
The number of threads currently in the pool.
-
#queue_length ⇒ Integer
The number of tasks in the queue awaiting execution.
-
#scheduled_task_count ⇒ Integer
The number of tasks that have been scheduled for execution on the pool since construction.
- #set_activity_context(defn, context) ⇒ Object
-
#shutdown ⇒ Object
Gracefully shutdown each thread when it is done with its current task.
Methods inherited from Temporalio::Worker::ActivityExecutor
defaults, #initialize_activity
Constructor Details
#initialize(max_threads: nil, idle_timeout: 20) ⇒ ThreadPool
Create a new thread pool executor that creates threads as needed.
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 29 def initialize(max_threads: nil, idle_timeout: 20) # rubocop:disable Lint/MissingSuper @max_threads = max_threads @idle_timeout = idle_timeout @mutex = Mutex.new @pool = [] @ready = [] @queue = [] @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @workers_counter = 0 @prune_interval = @idle_timeout / 2 @next_prune_time = ThreadPool._monotonic_time + @prune_interval end |
Class Method Details
.default ⇒ ThreadPool
Returns Default/shared thread pool executor instance with unlimited max threads.
15 16 17 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 15 def self.default @default ||= new end |
Instance Method Details
#active_count ⇒ Integer
Returns The number of threads that are actively executing tasks.
87 88 89 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 87 def active_count @mutex.synchronize { @pool.length - @ready.length } end |
#activity_context ⇒ Object
55 56 57 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 55 def activity_context Thread.current[:temporal_activity_context] end |
#completed_task_count ⇒ Integer
Returns The number of tasks that have been completed by the pool since construction.
82 83 84 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 82 def completed_task_count @mutex.synchronize { @completed_task_count } end |
#execute_activity(_defn, &block) ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 46 def execute_activity(_defn, &block) @mutex.synchronize do locked_assign_worker(&block) || locked_enqueue(&block) @scheduled_task_count += 1 locked_prune_pool if @next_prune_time < ThreadPool._monotonic_time end end |
#kill ⇒ Object
Kill each thread. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).
113 114 115 116 117 118 119 120 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 113 def kill @mutex.synchronize do # Kill all workers @pool.each(&:kill) @pool.clear @ready.clear end end |
#largest_length ⇒ Integer
Returns The largest number of threads that have been created in the pool since construction.
72 73 74 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 72 def largest_length @mutex.synchronize { @largest_length } end |
#length ⇒ Integer
Returns The number of threads currently in the pool.
92 93 94 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 92 def length @mutex.synchronize { @pool.length } end |
#queue_length ⇒ Integer
Returns The number of tasks in the queue awaiting execution.
97 98 99 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 97 def queue_length @mutex.synchronize { @queue.length } end |
#scheduled_task_count ⇒ Integer
Returns The number of tasks that have been scheduled for execution on the pool since construction.
77 78 79 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 77 def scheduled_task_count @mutex.synchronize { @scheduled_task_count } end |
#set_activity_context(defn, context) ⇒ Object
60 61 62 63 64 65 66 67 68 69 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 60 def set_activity_context(defn, context) Thread.current[:temporal_activity_context] = context # If they have opted in to raising on cancel, wire that up return unless defn.cancel_raise thread = Thread.current context&.cancellation&.add_cancel_callback do thread.raise(Error::CanceledError.new('Activity canceled')) if thread[:temporal_activity_context] == context end end |
#shutdown ⇒ Object
Gracefully shutdown each thread when it is done with its current task. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).
104 105 106 107 108 109 |
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 104 def shutdown @mutex.synchronize do # Stop all workers @pool.each(&:stop) end end |