Class: Temporalio::Worker
- Inherits:
-
Object
- Object
- Temporalio::Worker
- Defined in:
- lib/temporalio/worker.rb,
lib/temporalio/worker/runner.rb,
lib/temporalio/worker/reactor.rb,
lib/temporalio/worker/sync_worker.rb,
lib/temporalio/worker/activity_runner.rb,
lib/temporalio/worker/activity_worker.rb,
lib/temporalio/worker/thread_pool_executor.rb
Overview
Defined Under Namespace
Classes: ActivityRunner, ActivityWorker, Reactor, Runner, SyncWorker, ThreadPoolExecutor
Class Method Summary collapse
-
.run(*workers, shutdown_signals: []) { ... } ⇒ Object
Run multiple workers and wait for them to be shut down.
Instance Method Summary collapse
-
#initialize(connection, namespace, task_queue, activities: [], data_converter: Temporalio::DataConverter.new, activity_executor: nil, interceptors: [], max_concurrent_activities: 100, graceful_shutdown_timeout: nil) ⇒ Worker
constructor
Create a worker to process activities.
-
#run { ... } ⇒ Object
Run the worker and wait on it to be shut down.
-
#running? ⇒ Boolean
Whether the worker is running.
-
#shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown')) ⇒ Object
Initiate a worker shutdown and wait until complete.
-
#start(runner = nil) ⇒ Object
private
Start the worker asynchronously in a shared runtime.
-
#started? ⇒ Boolean
Whether the worker has been started.
Constructor Details
#initialize(connection, namespace, task_queue, activities: [], data_converter: Temporalio::DataConverter.new, activity_executor: nil, interceptors: [], max_concurrent_activities: 100, graceful_shutdown_timeout: nil) ⇒ Worker
Create a worker to process activities.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/temporalio/worker.rb', line 62 def initialize( connection, namespace, task_queue, activities: [], data_converter: Temporalio::DataConverter.new, activity_executor: nil, interceptors: [], max_concurrent_activities: 100, graceful_shutdown_timeout: nil ) @started = false @shutdown = false @mutex = Mutex.new @runtime = Temporalio::Runtime.instance @activity_executor = activity_executor || ThreadPoolExecutor.new(max_concurrent_activities) @core_worker = Temporalio::Bridge::Worker.create( @runtime.core_runtime, connection.core_connection, namespace, task_queue, 0, # maxCachedWorkflows disabled temporarily # FIXME: expose enable_non_local_activities activities.empty?, ) sync_worker = Worker::SyncWorker.new(@core_worker) @activity_worker = unless activities.empty? Worker::ActivityWorker.new( task_queue, sync_worker, activities, data_converter, interceptors, @activity_executor, graceful_shutdown_timeout, ) end unless @activity_worker raise ArgumentError, 'At least one activity must be specified' end end |
Class Method Details
.run(*workers, shutdown_signals: []) { ... } ⇒ Object
Run multiple workers and wait for them to be shut down.
This will not return until shutdown is complete (and all running activities in all workers finished) and will raise if any of the workers raises a fatal error.
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/temporalio/worker.rb', line 26 def self.run(*workers, shutdown_signals: [], &block) unless shutdown_signals.empty? if block raise ArgumentError, 'Temporalio::Worker.run accepts :shutdown_signals or a block, but not both' end signal_queue = Queue.new shutdown_signals.each do |signal| Signal.trap(signal) { signal_queue.close } end block = -> { signal_queue.pop } end Runner.new(*workers).run(&block) end |
Instance Method Details
#run { ... } ⇒ Object
A worker is only intended to be started once. Initialize a new worker should you need to run it again.
Run the worker and wait on it to be shut down.
This will not return until shutdown is complete (and all running activities finished) and will raise if there is a worker fatal error. To run multiple workers use the class method run.
118 119 120 |
# File 'lib/temporalio/worker.rb', line 118 def run(&block) Runner.new(self).run(&block) end |
#running? ⇒ Boolean
Whether the worker is running.
This is only ‘true` if the worker has been started and not yet shut down.
195 196 197 |
# File 'lib/temporalio/worker.rb', line 195 def running? @started && !@shutdown end |
#shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown')) ⇒ Object
Initiate a worker shutdown and wait until complete.
This can be called before the worker has even started and is safe for repeated invocations. This method will not return until the worker has completed shutting down.
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/temporalio/worker.rb', line 161 def shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown')) mutex.synchronize do return unless running? # Let the runner know we're shutting down, so it can stop other workers. # This will cause a reentrant call to this method, but the mutex above will block that call. runner&.shutdown(exception) # Initiate Core shutdown, which will start dropping poll requests core_worker.initiate_shutdown # Start the graceful activity shutdown timer, which will cancel activities after the timeout activity_worker&.setup_graceful_shutdown_timer(runtime.reactor) # Wait for workers to drain any outstanding tasks activity_worker&.drain activity_executor.shutdown # Finalize the shutdown by stopping the Core core_worker.finalize_shutdown @shutdown = true end end |
#start(runner = nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A worker is only intended to be started once. Initialize a new worker should you need to start it again.
Start the worker asynchronously in a shared runtime.
This is an internal method for advanced use-cases for those intended to implement their own worker runner.
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/temporalio/worker.rb', line 133 def start(runner = nil) mutex.synchronize do raise 'Worker is already started' if started? @started = true end @runner = runner runtime.ensure_callback_loop runtime.reactor.async do |task| if activity_worker task.async do |task| activity_worker.run(task) rescue StandardError => e shutdown(e) # initiate shutdown because of a fatal error end end end end |
#started? ⇒ Boolean
Whether the worker has been started.
186 187 188 |
# File 'lib/temporalio/worker.rb', line 186 def started? @started end |