Class: Temporalio::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker.rb,
lib/temporalio/worker/tuner.rb,
lib/temporalio/worker/plugin.rb,
lib/temporalio/worker/interceptor.rb,
lib/temporalio/worker/thread_pool.rb,
lib/temporalio/worker/poller_behavior.rb,
lib/temporalio/worker/activity_executor.rb,
lib/temporalio/worker/workflow_executor.rb,
lib/temporalio/worker/workflow_replayer.rb,
lib/temporalio/worker/deployment_options.rb,
lib/temporalio/worker/activity_executor/fiber.rb,
lib/temporalio/worker/activity_executor/thread_pool.rb,
lib/temporalio/worker/workflow_executor/thread_pool.rb,
lib/temporalio/worker/illegal_workflow_call_validator.rb

Overview

Worker for processing activities and workflows on a task queue.

Workers are created for a task queue and the items they can run. Then #run is used for running a single worker, or Worker.run_all is used for a collection of workers. These can wait until a block is complete or a Cancellation is canceled.

Defined Under Namespace

Modules: Interceptor, Plugin Classes: ActivityExecutor, DeploymentOptions, IllegalWorkflowCallValidator, Options, PollerBehavior, ThreadPool, Tuner, WorkflowExecutor, WorkflowReplayer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, task_queue:, activities: [], workflows: [], tuner: Tuner.create_fixed, activity_executors: ActivityExecutor.defaults, workflow_executor: WorkflowExecutor::ThreadPool.default, plugins: [], interceptors: [], identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, disable_eager_activity_execution: false, illegal_workflow_calls: Worker.default_illegal_workflow_calls, workflow_failure_exception_types: [], workflow_payload_codec_thread_pool: nil, unsafe_workflow_io_enabled: false, deployment_options: Worker.default_deployment_options, workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls), activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls), debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)) ⇒ Worker

Create a new worker. At least one activity or workflow must be present.



436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# File 'lib/temporalio/worker.rb', line 436

def initialize(
  client:,
  task_queue:,
  activities: [],
  workflows: [],
  tuner: Tuner.create_fixed,
  activity_executors: ActivityExecutor.defaults,
  workflow_executor: WorkflowExecutor::ThreadPool.default,
  plugins: [],
  interceptors: [],
  identity: nil,
  logger: client.options.logger,
  max_cached_workflows: 1000,
  max_concurrent_workflow_task_polls: 5,
  nonsticky_to_sticky_poll_ratio: 0.2,
  max_concurrent_activity_task_polls: 5,
  no_remote_activities: false,
  sticky_queue_schedule_to_start_timeout: 10,
  max_heartbeat_throttle_interval: 60,
  default_heartbeat_throttle_interval: 30,
  max_activities_per_second: nil,
  max_task_queue_activities_per_second: nil,
  graceful_shutdown_period: 0,
  disable_eager_activity_execution: false,
  illegal_workflow_calls: Worker.default_illegal_workflow_calls,
  workflow_failure_exception_types: [],
  workflow_payload_codec_thread_pool: nil,
  unsafe_workflow_io_enabled: false,
  deployment_options: Worker.default_deployment_options,
  workflow_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_workflow_task_polls),
  activity_task_poller_behavior: PollerBehavior::SimpleMaximum.new(max_concurrent_activity_task_polls),
  debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
)
  Internal::ProtoUtils.assert_non_reserved_name(task_queue)

  @options = Options.new(
    client:,
    task_queue:,
    activities:,
    workflows:,
    tuner:,
    activity_executors:,
    workflow_executor:,
    plugins:,
    interceptors:,
    identity:,
    logger:,
    max_cached_workflows:,
    max_concurrent_workflow_task_polls:,
    nonsticky_to_sticky_poll_ratio:,
    max_concurrent_activity_task_polls:,
    no_remote_activities:,
    sticky_queue_schedule_to_start_timeout:,
    max_heartbeat_throttle_interval:,
    default_heartbeat_throttle_interval:,
    max_activities_per_second:,
    max_task_queue_activities_per_second:,
    graceful_shutdown_period:,
    disable_eager_activity_execution:,
    illegal_workflow_calls:,
    workflow_failure_exception_types:,
    workflow_payload_codec_thread_pool:,
    unsafe_workflow_io_enabled:,
    deployment_options:,
    workflow_task_poller_behavior:,
    activity_task_poller_behavior:,
    debug_mode:
  ).freeze
  # Collect applicable client plugins and worker plugins, then validate and apply to options
  @plugins = client.options.plugins.grep(Plugin) + plugins
  Worker._validate_plugins!(@plugins)
  @options = @plugins.reduce(@options) { |options, plugin| plugin.configure_worker(options) }
  # Initialize the worker for the given options
  _initialize_from_options
end

Instance Attribute Details

#optionsOptions (readonly)



356
357
358
# File 'lib/temporalio/worker.rb', line 356

def options
  @options
end

Class Method Details

.default_build_idString



73
74
75
# File 'lib/temporalio/worker.rb', line 73

def self.default_build_id
  @default_build_id ||= _load_default_build_id
end

.default_deployment_optionsDeploymentOptions



100
101
102
103
104
# File 'lib/temporalio/worker.rb', line 100

def self.default_deployment_options
  @default_deployment_options ||= DeploymentOptions.new(
    version: WorkerDeploymentVersion.new(deployment_name: '', build_id: Worker.default_build_id)
  )
end

.default_illegal_workflow_callsHash<String, [:all, Array<Symbol, IllegalWorkflowCallValidator>, IllegalWorkflowCallValidator]>



288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/temporalio/worker.rb', line 288

def self.default_illegal_workflow_calls
  @default_illegal_workflow_calls ||= begin
    hash = {
      'BasicSocket' => :all,
      'Date' => i[initialize today],
      'DateTime' => i[initialize now],
      'Dir' => :all,
      'Fiber' => [:set_scheduler],
      'File' => :all,
      'FileTest' => :all,
      'FileUtils' => :all,
      'Find' => :all,
      'GC' => :all,
      'IO' => [
        :read
        # Intentionally leaving out write so puts will work. We don't want to add heavy logic replacing stdout or
        # trying to derive whether it's file vs stdout write.
        #:write
      ],
      'Kernel' => i[abort at_exit autoload autoload? eval exec exit fork gets load open rand readline readlines
                     sleep spawn srand system test trap],
      # Loggers use mutexes in ways that can hang workflows, so users need to disable the durable scheduler to use
      # them
      'Logger' => :all,
      'Monitor' => :all,
      'Net::HTTP' => :all,
      'Pathname' => :all,
      # TODO(cretz): Investigate why clock_gettime called from Timeout thread affects this code at all. Stack trace
      # test executing activities inside a timeout will fail if clock_gettime is blocked.
      'Process' => i[abort argv0 daemon detach exec exit exit! fork kill setpriority setproctitle setrlimit setsid
                      spawn times wait wait2 waitall warmup],
      # TODO(cretz): Allow Ractor.current since exception formatting in error_highlight references it
      # 'Ractor' => :all,
      'Random::Base' => [:initialize],
      'Resolv' => :all,
      'SecureRandom' => :all,
      'Signal' => :all,
      'Socket' => :all,
      'Tempfile' => :all,
      'Timeout' => :all,
      'Thread' => i[abort_on_exception= exit fork handle_interrupt ignore_deadlock= kill new pass
                     pending_interrupt? report_on_exception= start stop initialize join name= priority= raise run
                     terminate thread_variable_set wakeup],
      'Thread::ConditionVariable' => :all,
      'Thread::Mutex' => IllegalWorkflowCallValidator.known_safe_mutex_validator,
      'Thread::SizedQueue' => :all,
      'Thread::Queue' => :all,
      'Time' => IllegalWorkflowCallValidator.default_time_validators
    } #: Hash[String, :all | Array[Symbol]]
    hash.each_value(&:freeze)
    hash.freeze
  end
end

.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run all workers until cancellation or optional block completes. When the cancellation or block is complete, the workers are shut down. This will return the block result if everything successful or raise an error if not. See #run for details on how worker shutdown works.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Workers will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/temporalio/worker.rb', line 120

def self.run_all(
  *workers,
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  # We have to apply plugins. However, every plugin has a different worker. So we provide them the worker along with
  # other options, but we disregard any mutation of the worker on the next call.
  run_worker = proc do |options|
    # @type var options: Plugin::RunWorkerOptions
    _run_all_root(*workers,
                  cancellation: options.cancellation,
                  shutdown_signals: options.shutdown_signals,
                  raise_in_block_on_shutdown: options.raise_in_block_on_shutdown,
                  wait_block_complete:, &block)
  end
  plugins_with_workers = workers.flat_map { |w| w._plugins.map { |p| [p, w] } }
  run_worker = plugins_with_workers.reverse_each.reduce(run_worker) do |next_call, plugin_with_worker|
    plugin, worker = plugin_with_worker
    proc do |options|
      plugin.run_worker(options.with(worker:), next_call) # steep:ignore
    end
  end

  run_worker.call(Plugin::RunWorkerOptions.new(
    # Intentionally violating typing here because we set this on each call
    worker: nil, # steep:ignore
    cancellation:,
    shutdown_signals:,
    raise_in_block_on_shutdown:
  ))
end

Instance Method Details

#clientClient



612
613
614
# File 'lib/temporalio/worker.rb', line 612

def client
  @client_mutex.synchronize { @options.client }
end

#client=(new_client) ⇒ Object

Replace the worker’s client. When this is called, the client is replaced on the internal worker which means any new calls will be made on the new client (but existing calls will still complete on the previous one). This is commonly used for providing a new client with updated authentication credentials.



621
622
623
624
625
626
627
# File 'lib/temporalio/worker.rb', line 621

def client=(new_client)
  @client_mutex.synchronize do
    @bridge_worker.replace_client(new_client.connection._core_client)
    @options = @options.with(client: new_client)
    new_client
  end
end

#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the worker is shut down. This will return the block result if everything successful or raise an error if not.

Upon shutdown (either via cancellation, block completion, or worker fatal error), the worker immediately stops accepting new work. Then, after an optional grace period, all activities are canceled. This call then waits for every activity and workflow task to complete before returning.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Worker will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.



645
646
647
648
649
650
651
652
653
# File 'lib/temporalio/worker.rb', line 645

def run(
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  Worker.run_all(self, cancellation:, shutdown_signals:, raise_in_block_on_shutdown:, wait_block_complete:, &block)
end

#task_queueString



606
607
608
# File 'lib/temporalio/worker.rb', line 606

def task_queue
  @options.task_queue
end