Class: Temporalio::Worker
- Inherits:
-
Object
- Object
- Temporalio::Worker
- Defined in:
- lib/temporalio/worker.rb,
lib/temporalio/worker/runner.rb,
lib/temporalio/worker/reactor.rb,
lib/temporalio/worker/sync_worker.rb,
lib/temporalio/worker/activity_runner.rb,
lib/temporalio/worker/activity_worker.rb,
lib/temporalio/worker/thread_pool_executor.rb
Overview
Defined Under Namespace
Classes: ActivityRunner, ActivityWorker, Reactor, Runner, SyncWorker, ThreadPoolExecutor
Class Method Summary collapse
-
.run(*workers, shutdown_signals: []) { ... } ⇒ Object
Run multiple workers and wait for them to be shut down.
Instance Method Summary collapse
-
#initialize(connection, namespace, task_queue, activities: [], data_converter: Temporalio::DataConverter.new, activity_executor: nil, max_concurrent_activities: 100, graceful_shutdown_timeout: nil) ⇒ Worker
constructor
Create a worker to process workflows and/or activities.
-
#run { ... } ⇒ Object
Run the worker and wait on it to be shut down.
-
#running? ⇒ Boolean
Whether the worker is running.
-
#shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown')) ⇒ Object
Initiate a worker shutdown and wait until complete.
-
#start(runner = nil) ⇒ Object
private
Start the worker asynchronously in a shared runtime.
-
#started? ⇒ Boolean
Whether the worker has been started.
Constructor Details
#initialize(connection, namespace, task_queue, activities: [], data_converter: Temporalio::DataConverter.new, activity_executor: nil, max_concurrent_activities: 100, graceful_shutdown_timeout: nil) ⇒ Worker
Create a worker to process workflows and/or activities.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/temporalio/worker.rb', line 59 def initialize( connection, namespace, task_queue, activities: [], data_converter: Temporalio::DataConverter.new, activity_executor: nil, max_concurrent_activities: 100, graceful_shutdown_timeout: nil ) # TODO: Add worker interceptors @started = false @shutdown = false @mutex = Mutex.new @runtime = Temporalio::Runtime.instance @activity_executor = activity_executor || ThreadPoolExecutor.new(max_concurrent_activities) @core_worker = Temporalio::Bridge::Worker.create( @runtime.core_runtime, connection.core_connection, namespace, task_queue, ) @activity_worker = unless activities.empty? Worker::ActivityWorker.new( task_queue, @core_worker, activities, data_converter, @activity_executor, graceful_shutdown_timeout, ) end @workflow_worker = nil if !@activity_worker && !@workflow_worker raise ArgumentError, 'At least one activity or workflow must be specified' end end |
Class Method Details
.run(*workers, shutdown_signals: []) { ... } ⇒ Object
Run multiple workers and wait for them to be shut down.
This will not return until shutdown is complete (and all running activities in all workers finished) and will raise if any of the workers raises a fatal error.
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/temporalio/worker.rb', line 25 def self.run(*workers, shutdown_signals: [], &block) unless shutdown_signals.empty? if block raise ArgumentError, 'Temporalio::Worker.run accepts :shutdown_signals or a block, but not both' end signal_queue = Queue.new shutdown_signals.each do |signal| Signal.trap(signal) { signal_queue.close } end block = -> { signal_queue.pop } end Runner.new(*workers).run(&block) end |
Instance Method Details
#run { ... } ⇒ Object
A worker is only intended to be started once. Initialize a new worker should you need to run it again.
Run the worker and wait on it to be shut down.
This will not return until shutdown is complete (and all running activities finished) and will raise if there is a worker fatal error. To run multiple workers use the class method run.
111 112 113 |
# File 'lib/temporalio/worker.rb', line 111 def run(&block) Runner.new(self).run(&block) end |
#running? ⇒ Boolean
Whether the worker is running.
This is only ‘true` if the worker has been started and not yet shut down.
189 190 191 |
# File 'lib/temporalio/worker.rb', line 189 def running? @started && !@shutdown end |
#shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown')) ⇒ Object
Initiate a worker shutdown and wait until complete.
This can be called before the worker has even started and is safe for repeated invocations. This method will not return until the worker has completed shutting down.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/temporalio/worker.rb', line 157 def shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown')) mutex.synchronize do return unless running? # First initiate Core shutdown, which will start dropping poll requests core_worker.initiate_shutdown # Then let the runner know we're shutting down, so it can stop other workers runner&.shutdown(exception) # Wait for workers to drain any outstanding tasks activity_worker&.drain workflow_worker&.drain # Stop the executor (at this point there should already be nothing in it) activity_executor.shutdown # Finalize the shutdown by stopping the Core core_worker.shutdown @shutdown = true end end |
#start(runner = nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
A worker is only intended to be started once. Initialize a new worker should you need to start it again.
Start the worker asynchronously in a shared runtime.
This is an internal method for advanced use-cases for those intended to implement their own worker runner.
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# File 'lib/temporalio/worker.rb', line 126 def start(runner = nil) mutex.synchronize do raise 'Worker is already started' if started? @started = true end @runner = runner runtime.ensure_callback_loop runtime.reactor.async do |task| if activity_worker task.async do |task| activity_worker.run(task) rescue StandardError => e shutdown(e) # initiate shutdown because of a fatal error end end # TODO: Pending implementation task.async { |task| workflow_worker.run(task) } if workflow_worker end end |
#started? ⇒ Boolean
Whether the worker has been started.
180 181 182 |
# File 'lib/temporalio/worker.rb', line 180 def started? @started end |