Class: Temporalio::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker.rb,
lib/temporalio/worker/tuner.rb,
lib/temporalio/worker/interceptor.rb,
lib/temporalio/worker/activity_executor.rb,
lib/temporalio/worker/activity_executor/fiber.rb,
lib/temporalio/worker/activity_executor/thread_pool.rb

Overview

Worker for processing activities and workflows on a task queue.

Workers are created for a task queue and the items they can run. Then #run is used for running a single worker, or Worker.run_all is used for a collection of workers. These can wait until a block is complete or a Cancellation is canceled.

Defined Under Namespace

Modules: Interceptor Classes: ActivityExecutor, Options, Tuner

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client:, task_queue:, activities: [], activity_executors: ActivityExecutor.defaults, tuner: Tuner.create_fixed, interceptors: [], build_id: Worker.default_build_id, identity: nil, logger: client.options.logger, max_cached_workflows: 1000, max_concurrent_workflow_task_polls: 5, nonsticky_to_sticky_poll_ratio: 0.2, max_concurrent_activity_task_polls: 5, no_remote_activities: false, sticky_queue_schedule_to_start_timeout: 10, max_heartbeat_throttle_interval: 60, default_heartbeat_throttle_interval: 30, max_activities_per_second: nil, max_task_queue_activities_per_second: nil, graceful_shutdown_period: 0, use_worker_versioning: false) ⇒ Worker

Create a new worker. At least one activity or workflow must be present.

Parameters:

  • client (Client)

    Client for this worker.

  • task_queue (String)

    Task queue for this worker.

  • activities (Array<Activity, Class<Activity>, Activity::Definition>) (defaults to: [])

    Activities for this worker.

  • activity_executors (Hash<Symbol, Worker::ActivityExecutor>) (defaults to: ActivityExecutor.defaults)

    Executors that activities can run within.

  • tuner (Tuner) (defaults to: Tuner.create_fixed)

    Tuner that controls the amount of concurrent activities/workflows that run at a time.

  • interceptors (Array<Interceptor>) (defaults to: [])

    Interceptors specific to this worker. Note, interceptors set on the client that include the Interceptor module are automatically included here, so no need to specify them again.

  • build_id (String) (defaults to: Worker.default_build_id)

    Unique identifier for the current runtime. This is best set as a unique value representing all code and should change only when code does. This can be something like a git commit hash. If unset, default is hash of known Ruby code.

  • identity (String, nil) (defaults to: nil)

    Override the identity for this worker. If unset, client identity is used.

  • max_cached_workflows (Integer) (defaults to: 1000)

    Number of workflows held in cache for use by sticky task queue. If set to 0, workflow caching and sticky queuing are disabled.

  • max_concurrent_workflow_task_polls (Integer) (defaults to: 5)

    Maximum number of concurrent poll workflow task requests we will perform at a time on this worker’s task queue.

  • nonsticky_to_sticky_poll_ratio (Float) (defaults to: 0.2)

    ‘max_concurrent_workflow_task_polls“ * this number = the number of max pollers that will be allowed for the nonsticky queue when sticky tasks are enabled. If both defaults are used, the sticky queue will allow 4 max pollers while the nonsticky queue will allow one. The minimum for either poller is 1, so if `max_concurrent_workflow_task_polls` is 1 and sticky queues are enabled, there will be 2 concurrent polls.

  • max_concurrent_activity_task_polls (Integer) (defaults to: 5)

    Maximum number of concurrent poll activity task requests we will perform at a time on this worker’s task queue.

  • no_remote_activities (Boolean) (defaults to: false)

    If true, this worker will only handle workflow tasks and local activities, it will not poll for activity tasks.

  • sticky_queue_schedule_to_start_timeout (Float) (defaults to: 10)

    How long a workflow task is allowed to sit on the sticky queue before it is timed out and moved to the non-sticky queue where it may be picked up by any worker.

  • max_heartbeat_throttle_interval (Float) (defaults to: 60)

    Longest interval for throttling activity heartbeats.

  • default_heartbeat_throttle_interval (Float) (defaults to: 30)

    Default interval for throttling activity heartbeats in case per-activity heartbeat timeout is unset. Otherwise, it’s the per-activity heartbeat timeout * 0.8.

  • max_activities_per_second (Float, nil) (defaults to: nil)

    Limits the number of activities per second that this worker will process. The worker will not poll for new activities if by doing so it might receive and execute an activity which would cause it to exceed this limit.

  • max_task_queue_activities_per_second (Float, nil) (defaults to: nil)

    Sets the maximum number of activities per second the task queue will dispatch, controlled server-side. Note that this only takes effect upon an activity poll request. If multiple workers on the same queue have different values set, they will thrash with the last poller winning.

  • graceful_shutdown_period (Float) (defaults to: 0)

    Amount of time after shutdown is called that activities are given to complete before their tasks are canceled.

  • use_worker_versioning (Boolean) (defaults to: false)

    If true, the ‘build_id` argument must be specified, and this worker opts into the worker versioning feature. This ensures it only receives workflow tasks for workflows which it claims to be compatible with. For more information, see docs.temporal.io/workers#worker-versioning.

Raises:

  • (ArgumentError)


242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
# File 'lib/temporalio/worker.rb', line 242

def initialize(
  client:,
  task_queue:,
  activities: [],
  activity_executors: ActivityExecutor.defaults,
  tuner: Tuner.create_fixed,
  interceptors: [],
  build_id: Worker.default_build_id,
  identity: nil,
  logger: client.options.logger,
  max_cached_workflows: 1000,
  max_concurrent_workflow_task_polls: 5,
  nonsticky_to_sticky_poll_ratio: 0.2,
  max_concurrent_activity_task_polls: 5,
  no_remote_activities: false,
  sticky_queue_schedule_to_start_timeout: 10,
  max_heartbeat_throttle_interval: 60,
  default_heartbeat_throttle_interval: 30,
  max_activities_per_second: nil,
  max_task_queue_activities_per_second: nil,
  graceful_shutdown_period: 0,
  use_worker_versioning: false
)
  # TODO(cretz): Remove when workflows come about
  raise ArgumentError, 'Must have at least one activity' if activities.empty?

  @options = Options.new(
    client:,
    task_queue:,
    activities:,
    activity_executors:,
    tuner:,
    interceptors:,
    build_id:,
    identity:,
    logger:,
    max_cached_workflows:,
    max_concurrent_workflow_task_polls:,
    nonsticky_to_sticky_poll_ratio:,
    max_concurrent_activity_task_polls:,
    no_remote_activities:,
    sticky_queue_schedule_to_start_timeout:,
    max_heartbeat_throttle_interval:,
    default_heartbeat_throttle_interval:,
    max_activities_per_second:,
    max_task_queue_activities_per_second:,
    graceful_shutdown_period:,
    use_worker_versioning:
  ).freeze

  # Create the bridge worker
  @bridge_worker = Internal::Bridge::Worker.new(
    client.connection._core_client,
    Internal::Bridge::Worker::Options.new(
      activity: !activities.empty?,
      workflow: false,
      namespace: client.namespace,
      task_queue:,
      tuner: Internal::Bridge::Worker::TunerOptions.new(
        workflow_slot_supplier: to_bridge_slot_supplier_options(tuner.workflow_slot_supplier),
        activity_slot_supplier: to_bridge_slot_supplier_options(tuner.activity_slot_supplier),
        local_activity_slot_supplier: to_bridge_slot_supplier_options(tuner.local_activity_slot_supplier)
      ),
      build_id:,
      identity_override: identity,
      max_cached_workflows:,
      max_concurrent_workflow_task_polls:,
      nonsticky_to_sticky_poll_ratio:,
      max_concurrent_activity_task_polls:,
      no_remote_activities:,
      sticky_queue_schedule_to_start_timeout:,
      max_heartbeat_throttle_interval:,
      default_heartbeat_throttle_interval:,
      max_worker_activities_per_second: max_activities_per_second,
      max_task_queue_activities_per_second:,
      graceful_shutdown_period:,
      use_worker_versioning:
    )
  )

  # Collect interceptors from client and params
  @all_interceptors = client.options.interceptors.select { |i| i.is_a?(Interceptor) } + interceptors

  # Cancellation for the whole worker
  @worker_shutdown_cancellation = Cancellation.new

  # Create workers
  # TODO(cretz): Make conditional when workflows appear
  @activity_worker = Internal::Worker::ActivityWorker.new(self, @bridge_worker)

  # Validate worker
  @bridge_worker.validate
end

Instance Attribute Details

#optionsOptions (readonly)

Returns Frozen options for this client which has the same attributes as #initialize.

Returns:

  • (Options)

    Frozen options for this client which has the same attributes as #initialize.



198
199
200
# File 'lib/temporalio/worker.rb', line 198

def options
  @options
end

Class Method Details

.default_build_idString

Returns Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.

Returns:

  • (String)

    Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby source files in ‘$LOADED_FEATURES`. Users may prefer to set the build ID to a better representation of the source.



51
52
53
# File 'lib/temporalio/worker.rb', line 51

def self.default_build_id
  @default_build_id ||= _load_default_build_id
end

.run_all(*workers, cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run all workers until cancellation or optional block completes. When the cancellation or block is complete, the workers are shut down. This will return the block result if everything successful or raise an error if not. See #run for details on how worker shutdown works.

Parameters:

  • workers (Array<Worker>)

    Workers to run.

  • cancellation (Cancellation) (defaults to: Cancellation.new)

    Cancellation that can be canceled to shut down all workers.

  • shutdown_signals (Array) (defaults to: [])

    Signals to trap and cause worker shutdown.

  • raise_in_block_on_shutdown (Exception, nil) (defaults to: Error::CanceledError.new('Workers finished'))

    Exception to Thread.raise or Fiber.raise if a block is present and still running on shutdown. If nil, ‘raise` is not used.

  • wait_block_complete (Boolean) (defaults to: true)

    If block given and shutdown caused by something else (e.g. cancellation canceled), whether to wait on the block to complete before returning.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Workers will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.

Returns:

  • (Object)

    Return value of the block or nil of no block given.

Raises:

  • (ArgumentError)


90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/temporalio/worker.rb', line 90

def self.run_all(
  *workers,
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  # Confirm there is at least one and they are all workers
  raise ArgumentError, 'At least one worker required' if workers.empty?
  raise ArgumentError, 'Not all parameters are workers' unless workers.all? { |w| w.is_a?(Worker) }

  Internal::Bridge.assert_fiber_compatibility!

  # Start the multi runner
  runner = Internal::Worker::MultiRunner.new(workers:, shutdown_signals:)

  # Apply block
  runner.apply_thread_or_fiber_block(&block)

  # Reuse first worker logger
  logger = workers.first&.options&.logger or raise # Help steep

  # On cancel, initiate shutdown
  cancellation.add_cancel_callback do
    logger.info('Cancel invoked, beginning worker shutdown')
    runner.initiate_shutdown
  end

  # Poller loop, run until all pollers shut down
  first_error = nil
  block_result = nil
  loop do
    event = runner.next_event
    case event
    when Internal::Worker::MultiRunner::Event::PollSuccess
      # Successful poll
      event.worker._on_poll_bytes(event.worker_type, event.bytes)
    when Internal::Worker::MultiRunner::Event::PollFailure
      # Poll failure, this causes shutdown of all workers
      logger.error('Poll failure (beginning worker shutdown if not alaredy occurring)')
      logger.error(event.error)
      first_error ||= event.error
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::PollerShutDown
      # Individual poller shut down. Nothing to do here until we support
      # worker status or something.
    when Internal::Worker::MultiRunner::Event::AllPollersShutDown
      # This is where we break the loop, no more polling can happen
      break
    when Internal::Worker::MultiRunner::Event::BlockSuccess
      logger.info('Block completed, beginning worker shutdown')
      block_result = event
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::BlockFailure
      logger.error('Block failure (beginning worker shutdown)')
      logger.error(event.error)
      block_result = event
      first_error ||= event.error
      runner.initiate_shutdown
    when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived
      logger.info('Signal received, beginning worker shutdown')
      runner.initiate_shutdown
    else
      raise "Unexpected event: #{event}"
    end
  end

  # Now that all pollers have stopped, let's wait for all to complete
  begin
    runner.wait_complete_and_finalize_shutdown
  rescue StandardError => e
    logger.warn('Failed waiting and finalizing')
    logger.warn(e)
  end

  # If there was a block but not a result yet, we want to raise if that is
  # wanted, and wait if that is wanted
  if block_given? && block_result.nil?
    runner.raise_in_thread_or_fiber_block(raise_in_block_on_shutdown) unless raise_in_block_on_shutdown.nil?
    if wait_block_complete
      event = runner.next_event
      case event
      when Internal::Worker::MultiRunner::Event::BlockSuccess
        logger.info('Block completed (after worker shutdown)')
        block_result = event
      when Internal::Worker::MultiRunner::Event::BlockFailure
        logger.error('Block failure (after worker shutdown)')
        logger.error(event.error)
        block_result = event
        first_error ||= event.error
      when Internal::Worker::MultiRunner::Event::ShutdownSignalReceived
        # Do nothing, waiting for block
      else
        raise "Unexpected event: #{event}"
      end
    end
  end

  # If there was an shutdown-causing error, we raise that
  if !first_error.nil?
    raise first_error
  elsif block_result.is_a?(Internal::Worker::MultiRunner::Event::BlockSuccess)
    block_result.result
  end
end

Instance Method Details

#run(cancellation: Cancellation.new, shutdown_signals: [], raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'), wait_block_complete: true) { ... } ⇒ Object

Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the worker is shut down. This will return the block result if everything successful or raise an error if not.

Upon shutdown (either via cancellation, block completion, or worker fatal error), the worker immediately stops accepting new work. Then, after an optional grace period, all activities are canceled. This call then waits for every activity and workflow task to complete before returning.

Parameters:

  • cancellation (Cancellation) (defaults to: Cancellation.new)

    Cancellation that can be canceled to shut down this worker.

  • shutdown_signals (Array) (defaults to: [])

    Signals to trap and cause worker shutdown.

  • raise_in_block_on_shutdown (Exception, nil) (defaults to: Error::CanceledError.new('Workers finished'))

    Exception to Thread.raise or Fiber.raise if a block is present and still running on shutdown. If nil, ‘raise` is not used.

  • wait_block_complete (Boolean) (defaults to: true)

    If block given and shutdown caused by something else (e.g. cancellation canceled), whether to wait on the block to complete before returning.

Yields:

  • Optional block. This will be run in a new background thread or fiber. Worker will shut down upon completion of this and, assuming no other failures, return/bubble success/exception of the block.

Returns:

  • (Object)

    Return value of the block or nil of no block given.



357
358
359
360
361
362
363
364
365
# File 'lib/temporalio/worker.rb', line 357

def run(
  cancellation: Cancellation.new,
  shutdown_signals: [],
  raise_in_block_on_shutdown: Error::CanceledError.new('Workers finished'),
  wait_block_complete: true,
  &block
)
  Worker.run_all(self, cancellation:, shutdown_signals:, raise_in_block_on_shutdown:, wait_block_complete:, &block)
end

#task_queueString

Returns Task queue set on the worker options.

Returns:

  • (String)

    Task queue set on the worker options.



337
338
339
# File 'lib/temporalio/worker.rb', line 337

def task_queue
  @options.task_queue
end