Class: Temporalio::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/worker.rb,
lib/temporalio/worker/runner.rb,
lib/temporalio/worker/reactor.rb,
lib/temporalio/worker/sync_worker.rb,
lib/temporalio/worker/activity_runner.rb,
lib/temporalio/worker/activity_worker.rb,
lib/temporalio/worker/thread_pool_executor.rb

Overview

Worker to process workflows and/or activities.

Once created, workers can be run and shutdown explicitly via #run and #shutdown.

Defined Under Namespace

Classes: ActivityRunner, ActivityWorker, Reactor, Runner, SyncWorker, ThreadPoolExecutor

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, namespace, task_queue, activities: [], data_converter: Temporalio::DataConverter.new, activity_executor: nil, max_concurrent_activities: 100, graceful_shutdown_timeout: nil) ⇒ Worker

Create a worker to process workflows and/or activities.

Parameters:

  • connection (Temporalio::Connection)

    A connection to be used for this worker.

  • namespace (String)

    A namespace.

  • task_queue (String)

    A task queue.

  • activities (Array<Class>) (defaults to: [])

    A list of activities (subclasses of Activity).

  • data_converter (Temporalio::DataConverter) (defaults to: Temporalio::DataConverter.new)

    Data converter to use for all data conversions to/from payloads.

  • activity_executor (ThreadPoolExecutor) (defaults to: nil)

    Concurrent executor for all activities. Defaults to a ThreadPoolExecutor with ‘:max_concurrent_activities` available threads.

  • max_concurrent_activities (Integer) (defaults to: 100)

    Number of concurrently running activities.

  • graceful_shutdown_timeout (Integer) (defaults to: nil)

    Amount of time (in seconds) activities are given after a shutdown to complete before they are cancelled. A default value of ‘nil` means that activities are never cancelled when handling a shutdown.

Raises:

  • (ArgumentError)

    When no activities or workflows have been provided.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/temporalio/worker.rb', line 59

def initialize(
  connection,
  namespace,
  task_queue,
  activities: [],
  data_converter: Temporalio::DataConverter.new,
  activity_executor: nil,
  max_concurrent_activities: 100,
  graceful_shutdown_timeout: nil
)
  # TODO: Add worker interceptors
  @started = false
  @shutdown = false
  @mutex = Mutex.new
  @runtime = Temporalio::Runtime.instance
  @activity_executor = activity_executor || ThreadPoolExecutor.new(max_concurrent_activities)
  @core_worker = Temporalio::Bridge::Worker.create(
    @runtime.core_runtime,
    connection.core_connection,
    namespace,
    task_queue,
  )
  @activity_worker =
    unless activities.empty?
      Worker::ActivityWorker.new(
        task_queue,
        @core_worker,
        activities,
        data_converter,
        @activity_executor,
        graceful_shutdown_timeout,
      )
    end
  @workflow_worker = nil

  if !@activity_worker && !@workflow_worker
    raise ArgumentError, 'At least one activity or workflow must be specified'
  end
end

Class Method Details

.run(*workers, shutdown_signals: []) { ... } ⇒ Object

Run multiple workers and wait for them to be shut down.

This will not return until shutdown is complete (and all running activities in all workers finished) and will raise if any of the workers raises a fatal error.

Parameters:

  • workers (Array<Temporalio::Worker>)

    A list of the workers to be run.

  • shutdown_signals (Array<String>) (defaults to: [])

    A list of process signals for the worker to stop on. This argument can not be used with a custom block.

Yields:

  • Optionally you can provide a block by the end of which all the workers will be shut down. Any errors raised from this block will be re-raised by this method.



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/temporalio/worker.rb', line 25

def self.run(*workers, shutdown_signals: [], &block)
  unless shutdown_signals.empty?
    if block
      raise ArgumentError, 'Temporalio::Worker.run accepts :shutdown_signals or a block, but not both'
    end

    signal_queue = Queue.new

    shutdown_signals.each do |signal|
      Signal.trap(signal) { signal_queue.close }
    end

    block = -> { signal_queue.pop }
  end

  Runner.new(*workers).run(&block)
end

Instance Method Details

#run { ... } ⇒ Object

Note:

A worker is only intended to be started once. Initialize a new worker should you need to run it again.

Run the worker and wait on it to be shut down.

This will not return until shutdown is complete (and all running activities finished) and will raise if there is a worker fatal error. To run multiple workers use the class method run.

Yields:

  • Optionally you can provide a block by the end of which the worker will shut itself down. You can use this to stop a worker after some time has passed, your workflow has finished or any other arbitrary implementation has completed. Any errors raised from this block will be re-raised by this method.



111
112
113
# File 'lib/temporalio/worker.rb', line 111

def run(&block)
  Runner.new(self).run(&block)
end

#running?Boolean

Whether the worker is running.

This is only ‘true` if the worker has been started and not yet shut down.

Returns:

  • (Boolean)


189
190
191
# File 'lib/temporalio/worker.rb', line 189

def running?
  @started && !@shutdown
end

#shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown')) ⇒ Object

Initiate a worker shutdown and wait until complete.

This can be called before the worker has even started and is safe for repeated invocations. This method will not return until the worker has completed shutting down.

Parameters:

  • exception (Exception) (defaults to: Temporalio::Error::WorkerShutdown.new('Manual shutdown'))

    An exception to be raised from #run or run methods after a shutdown procedure has completed.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/temporalio/worker.rb', line 157

def shutdown(exception = Temporalio::Error::WorkerShutdown.new('Manual shutdown'))
  mutex.synchronize do
    return unless running?

    # First initiate Core shutdown, which will start dropping poll requests
    core_worker.initiate_shutdown
    # Then let the runner know we're shutting down, so it can stop other workers
    runner&.shutdown(exception)
    # Wait for workers to drain any outstanding tasks
    activity_worker&.drain
    workflow_worker&.drain
    # Stop the executor (at this point there should already be nothing in it)
    activity_executor.shutdown
    # Finalize the shutdown by stopping the Core
    core_worker.shutdown

    @shutdown = true
  end
end

#start(runner = nil) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Note:

A worker is only intended to be started once. Initialize a new worker should you need to start it again.

Start the worker asynchronously in a shared runtime.

This is an internal method for advanced use-cases for those intended to implement their own worker runner.

Parameters:



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/temporalio/worker.rb', line 126

def start(runner = nil)
  mutex.synchronize do
    raise 'Worker is already started' if started?

    @started = true
  end

  @runner = runner
  runtime.ensure_callback_loop

  runtime.reactor.async do |task|
    if activity_worker
      task.async do |task|
        activity_worker.run(task)
      rescue StandardError => e
        shutdown(e) # initiate shutdown because of a fatal error
      end
    end

    # TODO: Pending implementation
    task.async { |task| workflow_worker.run(task) } if workflow_worker
  end
end

#started?Boolean

Whether the worker has been started.

Returns:

  • (Boolean)


180
181
182
# File 'lib/temporalio/worker.rb', line 180

def started?
  @started
end