Class: Temporalio::Worker::ActivityExecutor::ThreadPool

Inherits:
Temporalio::Worker::ActivityExecutor show all
Defined in:
lib/temporalio/worker/activity_executor/thread_pool.rb

Overview

Activity executor for scheduling activities in their own thread. This implementation is a stripped down form of Concurrent Ruby’s ‘CachedThreadPool`.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Temporalio::Worker::ActivityExecutor

defaults, #initialize_activity

Constructor Details

#initialize(max_threads: nil, idle_timeout: 20) ⇒ ThreadPool

Create a new thread pool executor that creates threads as needed.

Parameters:

  • max_threads (Integer, nil) (defaults to: nil)

    Maximum number of thread workers to create, or nil for unlimited max.

  • idle_timeout (Float) (defaults to: 20)

    Number of seconds before a thread worker with no work should be stopped. Note, the check of whether a thread worker is idle is only done on each new activity.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 29

def initialize(max_threads: nil, idle_timeout: 20) # rubocop:disable Lint/MissingSuper
  @max_threads = max_threads
  @idle_timeout = idle_timeout

  @mutex = Mutex.new
  @pool = []
  @ready = []
  @queue = []
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0
  @workers_counter = 0
  @prune_interval = @idle_timeout / 2
  @next_prune_time = ThreadPool._monotonic_time + @prune_interval
end

Class Method Details

.defaultThreadPool

Returns Default/shared thread pool executor instance with unlimited max threads.

Returns:

  • (ThreadPool)

    Default/shared thread pool executor instance with unlimited max threads.



15
16
17
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 15

def self.default
  @default ||= new
end

Instance Method Details

#active_countInteger

Returns The number of threads that are actively executing tasks.

Returns:

  • (Integer)

    The number of threads that are actively executing tasks.



87
88
89
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 87

def active_count
  @mutex.synchronize { @pool.length - @ready.length }
end

#activity_contextObject



55
56
57
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 55

def activity_context
  Thread.current[:temporal_activity_context]
end

#completed_task_countInteger

Returns The number of tasks that have been completed by the pool since construction.

Returns:

  • (Integer)

    The number of tasks that have been completed by the pool since construction.



82
83
84
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 82

def completed_task_count
  @mutex.synchronize { @completed_task_count }
end

#execute_activity(_defn, &block) ⇒ Object



46
47
48
49
50
51
52
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 46

def execute_activity(_defn, &block)
  @mutex.synchronize do
    locked_assign_worker(&block) || locked_enqueue(&block)
    @scheduled_task_count += 1
    locked_prune_pool if @next_prune_time < ThreadPool._monotonic_time
  end
end

#killObject

Kill each thread. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).



113
114
115
116
117
118
119
120
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 113

def kill
  @mutex.synchronize do
    # Kill all workers
    @pool.each(&:kill)
    @pool.clear
    @ready.clear
  end
end

#largest_lengthInteger

Returns The largest number of threads that have been created in the pool since construction.

Returns:

  • (Integer)

    The largest number of threads that have been created in the pool since construction.



72
73
74
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 72

def largest_length
  @mutex.synchronize { @largest_length }
end

#lengthInteger

Returns The number of threads currently in the pool.

Returns:

  • (Integer)

    The number of threads currently in the pool.



92
93
94
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 92

def length
  @mutex.synchronize { @pool.length }
end

#queue_lengthInteger

Returns The number of tasks in the queue awaiting execution.

Returns:

  • (Integer)

    The number of tasks in the queue awaiting execution.



97
98
99
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 97

def queue_length
  @mutex.synchronize { @queue.length }
end

#scheduled_task_countInteger

Returns The number of tasks that have been scheduled for execution on the pool since construction.

Returns:

  • (Integer)

    The number of tasks that have been scheduled for execution on the pool since construction.



77
78
79
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 77

def scheduled_task_count
  @mutex.synchronize { @scheduled_task_count }
end

#set_activity_context(defn, context) ⇒ Object



60
61
62
63
64
65
66
67
68
69
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 60

def set_activity_context(defn, context)
  Thread.current[:temporal_activity_context] = context
  # If they have opted in to raising on cancel, wire that up
  return unless defn.cancel_raise

  thread = Thread.current
  context&.cancellation&.add_cancel_callback do
    thread.raise(Error::CanceledError.new('Activity canceled')) if thread[:temporal_activity_context] == context
  end
end

#shutdownObject

Gracefully shutdown each thread when it is done with its current task. This should not be called until all workers using this executor are complete. This does not need to be called at all on program exit (e.g. for the global default).



104
105
106
107
108
109
# File 'lib/temporalio/worker/activity_executor/thread_pool.rb', line 104

def shutdown
  @mutex.synchronize do
    # Stop all workers
    @pool.each(&:stop)
  end
end