Class: Temporalio::Worker
- Inherits:
-
Object
- Object
- Temporalio::Worker
- Defined in:
- lib/temporalio/worker.rb,
lib/temporalio/worker/tuner.rb,
lib/temporalio/worker/interceptor.rb,
lib/temporalio/worker/activity_executor.rb,
lib/temporalio/worker/activity_executor/fiber.rb,
lib/temporalio/worker/activity_executor/thread_pool.rb
Overview
Worker for processing activities and workflows on a task queue.
Workers are created for a task queue and the items they can run. Then #run is used for running a single worker, or Worker.run_all is used for a collection of workers. These can wait until a block is complete or a Cancellation is canceled.
Defined Under Namespace
Modules: Interceptor Classes: ActivityExecutor, Options, Tuner
Instance Attribute Summary collapse
-
#options ⇒ Options
readonly
Frozen options for this client which has the same attributes as #initialize.
Class Method Summary collapse
-
.default_build_id ⇒ String
Memoized default build ID.
-
.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run all workers until cancellation or optional block completes.
Instance Method Summary collapse
-
#initialize(client:, task_queue:, activities: [], activity_executors: ActivityExecutor.defaults, tuner: Tuner.create_fixed, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false) ⇒ Worker
constructor
Create a new worker.
-
#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run this worker until cancellation or optional block completes.
-
#task_queue ⇒ String
Task queue set on the worker options.
Constructor Details
#initialize(client:, task_queue:, activities: [], activity_executors: ActivityExecutor.defaults, tuner: Tuner.create_fixed, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false) ⇒ Worker
Create a new worker. At least one activity or workflow must be present.
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 |
# File 'lib/temporalio/worker.rb', line 242 def initialize( client:, task_queue:, activities: [], activity_executors: ActivityExecutor.defaults, tuner: Tuner.create_fixed, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client..logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false ) # TODO(cretz): Remove when workflows come about raise ArgumentError, 'Must have at least one activity' if activities.empty? @options = Options.new( client:, task_queue:, activities:, activity_executors:, tuner:, interceptors:, build_id:, identity:, logger:, max_cached_workflows:, max_concurrent_workflow_task_polls:, nonsticky_to_sticky_poll_ratio:, max_concurrent_activity_task_polls:, no_remote_activities:, sticky_queue_schedule_to_start_timeout:, max_heartbeat_throttle_interval:, default_heartbeat_throttle_interval:, max_activities_per_second:, max_task_queue_activities_per_second:, graceful_shutdown_period:, use_worker_versioning: ).freeze # Create the bridge worker @bridge_worker = Internal::Bridge::Worker.new( client.connection._core_client, Internal::Bridge::Worker::Options.new( activity: !activities.empty?, workflow: false, namespace: client.namespace, task_queue:, tuner: Internal::Bridge::Worker::TunerOptions.new( workflow_slot_supplier: (tuner.workflow_slot_supplier), activity_slot_supplier: (tuner.activity_slot_supplier), local_activity_slot_supplier: (tuner.local_activity_slot_supplier) ), build_id:, identity_override: identity, max_cached_workflows:, max_concurrent_workflow_task_polls:, nonsticky_to_sticky_poll_ratio:, max_concurrent_activity_task_polls:, no_remote_activities:, sticky_queue_schedule_to_start_timeout:, max_heartbeat_throttle_interval:, default_heartbeat_throttle_interval:, max_worker_activities_per_second: max_activities_per_second, max_task_queue_activities_per_second:, graceful_shutdown_period:, use_worker_versioning: ) ) # Collect interceptors from client and params @all_interceptors = client..interceptors.select { |i| i.is_a?(Interceptor) } + interceptors # Cancellation for the whole worker @worker_shutdown_cancellation = Cancellation.new # Create workers # TODO(cretz): Make conditional when workflows appear @activity_worker = Internal::Worker::ActivityWorker.new(self, @bridge_worker) # Validate worker @bridge_worker.validate end |
Instance Attribute Details
#options ⇒ Options (readonly)
Returns Frozen options for this client which has the same attributes as #initialize.
198 199 200 |
# File 'lib/temporalio/worker.rb', line 198 def @options end |
Class Method Details
.default_build_id ⇒ String
Returns Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.
51 52 53 |
# File 'lib/temporalio/worker.rb', line 51 def self.default_build_id @default_build_id ||= _load_default_build_id end |
.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run all workers until cancellation or optional block completes. When the cancellation or block is complete, the workers are shut down. This will return the block result if everything successful or raise an error if not. See #run for details on how worker shutdown works.
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/temporalio/worker.rb', line 90 def self.run_all( *workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true, &block ) # Confirm there is at least one and they are all workers raise ArgumentError, 'At least one worker required' if workers.empty? raise ArgumentError, 'Not all parameters are workers' unless workers.all? { |w| w.is_a?(Worker) } Internal::Bridge.assert_fiber_compatibility! # Start the multi runner runner = Internal::Worker::MultiRunner.new(workers:, shutdown_signals:) # Apply block runner.apply_thread_or_fiber_block(&block) # Reuse first worker logger logger = workers.first&.&.logger or raise # Help steep # On cancel, initiate shutdown cancellation.add_cancel_callback do logger.info('Cancel invoked, beginning worker shutdown') runner.initiate_shutdown end # Poller loop, run until all pollers shut down first_error = nil block_result = nil loop do event = runner.next_event case event when Internal::Worker::MultiRunner::Event::PollSuccess # Successful poll event.worker._on_poll_bytes(event.worker_type, event.bytes) when Internal::Worker::MultiRunner::Event::PollFailure # Poll failure, this causes shutdown of all workers logger.error('Poll failure (beginning worker shutdown if not alaredy occurring)') logger.error(event.error) first_error ||= event.error runner.initiate_shutdown when Internal::Worker::MultiRunner::Event::PollerShutDown # Individual poller shut down. Nothing to do here until we support # worker status or something. when Internal::Worker::MultiRunner::Event::AllPollersShutDown # This is where we break the loop, no more polling can happen break when Internal::Worker::MultiRunner::Event::BlockSuccess logger.info('Block completed, beginning worker shutdown') block_result = event runner.initiate_shutdown when Internal::Worker::MultiRunner::Event::BlockFailure logger.error('Block failure (beginning worker shutdown)') logger.error(event.error) block_result = event first_error ||= event.error runner.initiate_shutdown when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived logger.info('Signal received, beginning worker shutdown') runner.initiate_shutdown else raise "Unexpected event: #{event}" end end # Now that all pollers have stopped, let's wait for all to complete begin runner.wait_complete_and_finalize_shutdown rescue StandardError => e logger.warn('Failed waiting and finalizing') logger.warn(e) end # If there was a block but not a result yet, we want to raise if that is # wanted, and wait if that is wanted if block_given? && block_result.nil? runner.raise_in_thread_or_fiber_block(raise_in_block_on_shutdown) unless raise_in_block_on_shutdown.nil? if wait_block_complete event = runner.next_event case event when Internal::Worker::MultiRunner::Event::BlockSuccess logger.info('Block completed (after worker shutdown)') block_result = event when Internal::Worker::MultiRunner::Event::BlockFailure logger.error('Block failure (after worker shutdown)') logger.error(event.error) block_result = event first_error ||= event.error when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived # Do nothing, waiting for block else raise "Unexpected event: #{event}" end end end # If there was an shutdown-causing error, we raise that if !first_error.nil? raise first_error elsif block_result.is_a?(Internal::Worker::MultiRunner::Event::BlockSuccess) block_result.result end end |
Instance Method Details
#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object
Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the worker is shut down. This will return the block result if everything successful or raise an error if not.
Upon shutdown (either via cancellation, block completion, or worker fatal error), the worker immediately stops accepting new work. Then, after an optional grace period, all activities are canceled. This call then waits for every activity and workflow task to complete before returning.
357 358 359 360 361 362 363 364 365 |
# File 'lib/temporalio/worker.rb', line 357 def run( cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true, &block ) Worker.run_all(self, cancellation:, shutdown_signals:, raise_in_block_on_shutdown:, wait_block_complete:, &block) end |
#task_queue ⇒ String
Returns Task queue set on the worker options.
337 338 339 |
# File 'lib/temporalio/worker.rb', line 337 def task_queue @options.task_queue end |