Class: Temporalio::Client::WorkflowHandle

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/client/workflow_handle.rb

Overview

Handle for interacting with a workflow. This is usually created via #start_workflow or #workflow_handle.

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#first_execution_run_idString? (readonly)

Run ID used for some calls like #cancel and #terminate to ensure the cancel and terminate happen for a workflow ID on a chain started with this run ID.

This can be set when using Temporalio::Client#workflow_handle. When Temporalio::Client#start_workflow is called without a start signal, this is set to the resulting run.

This cannot be mutated. If a different first execution run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    First execution run ID.



49
50
51
# File 'lib/temporalio/client/workflow_handle.rb', line 49

def first_execution_run_id
  @first_execution_run_id
end

#idString (readonly)

Returns ID for the workflow.

Returns:

  • (String)

    ID for the workflow.



17
18
19
# File 'lib/temporalio/client/workflow_handle.rb', line 17

def id
  @id
end

#result_run_idString? (readonly)

Run ID used for #result calls if present to ensure result is for a workflow starting from this run.

When this handle is created via Temporalio::Client#workflow_handle, this is the same as #run_id. When this handle is created via Temporalio::Client#start_workflow, this value will be the resulting run ID.

This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    Result run ID.



37
38
39
# File 'lib/temporalio/client/workflow_handle.rb', line 37

def result_run_id
  @result_run_id
end

#run_idString? (readonly)

Run ID used for #signal, #query, and #start_update/#execute_update calls if present to ensure the signal/query/update happen on this exact run.

This is only created via Temporalio::Client#workflow_handle. Temporalio::Client#start_workflow will not set this value.

This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.

Returns:

  • (String, nil)

    Run ID.



27
28
29
# File 'lib/temporalio/client/workflow_handle.rb', line 27

def run_id
  @run_id
end

Instance Method Details

#cancel(rpc_metadata: nil, rpc_timeout: nil) ⇒ Object

Note:

Handles created as a result of signal with start will cancel the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Cancel the workflow. This will issue a cancellation for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.

Parameters:

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Raises:



402
403
404
405
406
407
408
409
410
411
412
413
# File 'lib/temporalio/client/workflow_handle.rb', line 402

def cancel(
  rpc_metadata: nil,
  rpc_timeout: nil
)
  @client._impl.cancel_workflow(Interceptor::CancelWorkflowInput.new(
                                  workflow_id: id,
                                  run_id:,
                                  first_execution_run_id:,
                                  rpc_metadata:,
                                  rpc_timeout:
                                ))
end

#describe(rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowExecution::Description

Note:

Handles created as a result of Temporalio::Client#start_workflow will describe the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Get workflow details. This will get details for the #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Returns:

Raises:



154
155
156
157
158
159
160
161
162
163
164
# File 'lib/temporalio/client/workflow_handle.rb', line 154

def describe(
  rpc_metadata: nil,
  rpc_timeout: nil
)
  @client._impl.describe_workflow(Interceptor::DescribeWorkflowInput.new(
                                    workflow_id: id,
                                    run_id:,
                                    rpc_metadata:,
                                    rpc_timeout:
                                  ))
end

#execute_update(update, *args, id: SecureRandom.uuid, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object?

Note:

Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Note:

WARNING: This API is experimental.

Send an update request to the workflow and wait for it to complete. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • update (String)

    Update name.

  • args (Array<Object>)

    Update arguments.

  • id (String) (defaults to: SecureRandom.uuid)

    ID of the update.

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Returns:

  • (Object, nil)

    Update result.

Raises:



352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/temporalio/client/workflow_handle.rb', line 352

def execute_update(
  update,
  *args,
  id: SecureRandom.uuid,
  rpc_metadata: nil,
  rpc_timeout: nil
)
  start_update(
    update,
    *args,
    wait_for_stage: WorkflowUpdateWaitStage::COMPLETED,
    id:,
    rpc_metadata:,
    rpc_timeout:
  ).result
end

#fetch_history(event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowHistory

Get workflow history. This is a helper on top of #fetch_history_events.

Parameters:

  • event_filter_type (Api::Enums::V1::HistoryEventFilterType) (defaults to: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)

    Types of events to fetch.

  • skip_archival (Boolean) (defaults to: false)

    Whether to skip archival.

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Returns:

Raises:



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/temporalio/client/workflow_handle.rb', line 176

def fetch_history(
  event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
  skip_archival: false,
  rpc_metadata: nil,
  rpc_timeout: nil
)
  WorkflowHistory.new(
    fetch_history_events(
      event_filter_type:,
      skip_archival:,
      rpc_metadata:,
      rpc_timeout:
    ).to_a
  )
end

#fetch_history_events(wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_metadata: nil, rpc_timeout: nil) ⇒ Enumerator<Api::History::V1::HistoryEvent>

Fetch an enumerator of history events for this workflow. Internally this is done in paginated form, but it is presented as an enumerator.

Parameters:

  • wait_new_event (Boolean) (defaults to: false)

    If true, when the end of the current set of events is reached but the workflow is not complete, this will wait for the next event. If false, the enumerable completes at the end of current history.

  • event_filter_type (Api::Enums::V1::HistoryEventFilterType) (defaults to: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)

    Types of events to fetch.

  • skip_archival (Boolean) (defaults to: false)

    Whether to skip archival.

  • specific_run_id (String, nil) (defaults to: run_id)

    Run ID to fetch events for. Default is the #run_id. Most users will not need to set this and instead use the one on the class.

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Returns:

Raises:



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/temporalio/client/workflow_handle.rb', line 208

def fetch_history_events(
  wait_new_event: false,
  event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
  skip_archival: false,
  specific_run_id: run_id,
  rpc_metadata: nil,
  rpc_timeout: nil
)
  @client._impl.fetch_workflow_history_events(Interceptor::FetchWorkflowHistoryEventsInput.new(
                                                workflow_id: id,
                                                run_id: specific_run_id,
                                                wait_new_event:,
                                                event_filter_type:,
                                                skip_archival:,
                                                rpc_metadata:,
                                                rpc_timeout:
                                              ))
end

#query(query, *args, reject_condition: @client.options.default_workflow_query_reject_condition, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object?

Note:

Handles created as a result of Temporalio::Client#start_workflow will query the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Query the workflow. This will query for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • query (String)

    Query name.

  • args (Array<Object>)

    Query arguments.

  • reject_condition (WorkflowQueryRejectCondition, nil) (defaults to: @client.options.default_workflow_query_reject_condition)

    Condition for rejecting the query.

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Returns:

  • (Object, nil)

    Query result.

Raises:



273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/temporalio/client/workflow_handle.rb', line 273

def query(
  query,
  *args,
  reject_condition: @client.options.default_workflow_query_reject_condition,
  rpc_metadata: nil,
  rpc_timeout: nil
)
  @client._impl.query_workflow(Interceptor::QueryWorkflowInput.new(
                                 workflow_id: id,
                                 run_id:,
                                 query:,
                                 args:,
                                 reject_condition:,
                                 headers: {},
                                 rpc_metadata:,
                                 rpc_timeout:
                               ))
end

#result(follow_runs: true, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object

Wait for the result of the workflow.

This will use #result_run_id if present to base the result on. To use another run ID, a new handle must be created via Temporalio::Client#workflow_handle.

Parameters:

  • follow_runs (Boolean) (defaults to: true)

    If true, workflow runs will be continually fetched across retries and continue as new until the latest one is found. If false, the first result is used.

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Returns:

  • (Object)

    Result of the workflow after being converted by the data converter.

Raises:



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/temporalio/client/workflow_handle.rb', line 75

def result(
  follow_runs: true,
  rpc_metadata: nil,
  rpc_timeout: nil
)
  # Wait on the close event, following as needed
  hist_run_id = result_run_id
  loop do
    # Get close event
    event = fetch_history_events(
      wait_new_event: true,
      event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
      skip_archival: true,
      specific_run_id: hist_run_id,
      rpc_metadata:,
      rpc_timeout:
    ).next

    # Check each close type'
    case event.event_type
    when :EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED
      attrs = event.workflow_execution_completed_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      return @client.data_converter.from_payloads(attrs.result).first
    when :EVENT_TYPE_WORKFLOW_EXECUTION_FAILED
      attrs = event.workflow_execution_failed_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      raise Error::WorkflowFailedError.new, cause: @client.data_converter.from_failure(attrs.failure)
    when :EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED
      attrs = event.workflow_execution_canceled_event_attributes
      raise Error::WorkflowFailedError.new, cause: Error::CanceledError.new(
        'Workflow execution canceled',
        details: @client.data_converter.from_payloads(attrs&.details)
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED
      attrs = event.workflow_execution_terminated_event_attributes
      raise Error::WorkflowFailedError.new, cause: Error::TerminatedError.new(
        Internal::ProtoUtils.string_or(attrs.reason, 'Workflow execution terminated'),
        details: @client.data_converter.from_payloads(attrs&.details)
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT
      attrs = event.workflow_execution_timed_out_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      raise Error::WorkflowFailedError.new, cause: Error::TimeoutError.new(
        'Workflow execution timed out',
        type: Api::Enums::V1::TimeoutType::TIMEOUT_TYPE_START_TO_CLOSE,
        last_heartbeat_details: []
      )
    when :EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW
      attrs = event.workflow_execution_continued_as_new_event_attributes
      hist_run_id = attrs.new_execution_run_id
      next if follow_runs && hist_run_id && !hist_run_id.empty?

      # TODO: Use more specific error and decode failure
      raise Error::WorkflowContinuedAsNewError.new(new_run_id: attrs.new_execution_run_id)
    else
      raise Error, "Unknown close event type: #{event.event_type}"
    end
  end
end

#signal(signal, *args, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object

Note:

Handles created as a result of Temporalio::Client#start_workflow will signal the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Send a signal to the workflow. This will signal for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • signal (String)

    Signal name.

  • args (Array<Object>)

    Signal arguments.

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Raises:



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/temporalio/client/workflow_handle.rb', line 239

def signal(
  signal,
  *args,
  rpc_metadata: nil,
  rpc_timeout: nil
)
  @client._impl.signal_workflow(Interceptor::SignalWorkflowInput.new(
                                  workflow_id: id,
                                  run_id:,
                                  signal:,
                                  args:,
                                  headers: {},
                                  rpc_metadata:,
                                  rpc_timeout:
                                ))
end

#start_update(update, *args, wait_for_stage:, id: SecureRandom.uuid, rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowUpdateHandle

Note:

Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Note:

WARNING: This API is experimental.

Send an update request to the workflow and return a handle to it. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.

Parameters:

  • update (String)

    Update name.

  • args (Array<Object>)

    Update arguments.

  • wait_for_stage (WorkflowUpdateWaitStage)

    Required stage to wait until returning. ADMITTED is not currently supported. See docs.temporal.io/workflows#update for more details.

  • id (String) (defaults to: SecureRandom.uuid)

    ID of the update.

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Returns:

Raises:



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/temporalio/client/workflow_handle.rb', line 312

def start_update(
  update,
  *args,
  wait_for_stage:,
  id: SecureRandom.uuid,
  rpc_metadata: nil,
  rpc_timeout: nil
)
  @client._impl.start_workflow_update(Interceptor::StartWorkflowUpdateInput.new(
                                        workflow_id: self.id,
                                        run_id:,
                                        update_id: id,
                                        update:,
                                        args:,
                                        wait_for_stage:,
                                        headers: {},
                                        rpc_metadata:,
                                        rpc_timeout:
                                      ))
end

#terminate(reason = nil, details: [], rpc_metadata: nil, rpc_timeout: nil) ⇒ Object

Note:

Handles created as a result of signal with start will terminate the latest workflow with the same workflow ID even if it is unrelated to the started workflow.

Terminate the workflow. This will issue a termination for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.

Parameters:

  • reason (String, nil) (defaults to: nil)

    Reason for the termination.

  • details (Array<Object>) (defaults to: [])

    Details to store on the termination.

  • rpc_metadata (Hash<String, String>, nil) (defaults to: nil)

    Headers to include on the RPC call.

  • rpc_timeout (Float, nil) (defaults to: nil)

    Number of seconds before timeout.

Raises:



428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
# File 'lib/temporalio/client/workflow_handle.rb', line 428

def terminate(
  reason = nil,
  details: [],
  rpc_metadata: nil,
  rpc_timeout: nil
)
  @client._impl.terminate_workflow(Interceptor::TerminateWorkflowInput.new(
                                     workflow_id: id,
                                     run_id:,
                                     first_execution_run_id:,
                                     reason:,
                                     details:,
                                     rpc_metadata:,
                                     rpc_timeout:
                                   ))
end

#update_handle(id, specific_run_id: run_id) ⇒ WorkflowUpdateHandle

Note:

WARNING: This API is experimental.

Get a handle for an update. The handle can be used to wait on the update result.

Parameters:

  • id (String)

    ID of the update.

  • specific_run_id (String, nil) (defaults to: run_id)

    Workflow run ID to get update handle for. Default is the #run_id. Most users will not need to set this and instead use the one on the class.

Returns:



378
379
380
381
382
383
384
385
386
387
388
389
# File 'lib/temporalio/client/workflow_handle.rb', line 378

def update_handle(
  id,
  specific_run_id: run_id
)
  WorkflowUpdateHandle.new(
    client: @client,
    id:,
    workflow_id: self.id,
    workflow_run_id: specific_run_id,
    known_outcome: nil
  )
end