Class: Temporalio::Client::WorkflowHandle
- Inherits:
-
Object
- Object
- Temporalio::Client::WorkflowHandle
- Defined in:
- lib/temporalio/client/workflow_handle.rb
Overview
Handle for interacting with a workflow. This is usually created via #start_workflow or #workflow_handle.
Instance Attribute Summary collapse
-
#first_execution_run_id ⇒ String?
readonly
Run ID used for some calls like #cancel and #terminate to ensure the cancel and terminate happen for a workflow ID on a chain started with this run ID.
-
#id ⇒ String
readonly
ID for the workflow.
-
#result_run_id ⇒ String?
readonly
Run ID used for #result calls if present to ensure result is for a workflow starting from this run.
-
#run_id ⇒ String?
readonly
Run ID used for #signal, #query, and #start_update/#execute_update calls if present to ensure the signal/query/update happen on this exact run.
Instance Method Summary collapse
-
#cancel(rpc_metadata: nil, rpc_timeout: nil) ⇒ Object
Cancel the workflow.
-
#describe(rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowExecution::Description
Get workflow details.
-
#execute_update(update, *args, id: SecureRandom.uuid, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object?
Send an update request to the workflow and wait for it to complete.
-
#fetch_history(event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowHistory
Get workflow history.
-
#fetch_history_events(wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_metadata: nil, rpc_timeout: nil) ⇒ Enumerator<Api::History::V1::HistoryEvent>
Fetch an enumerator of history events for this workflow.
-
#query(query, *args, reject_condition: @client.options.default_workflow_query_reject_condition, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object?
Query the workflow.
-
#result(follow_runs: true, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object
Wait for the result of the workflow.
-
#signal(signal, *args, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object
Send a signal to the workflow.
-
#start_update(update, *args, wait_for_stage:, id: SecureRandom.uuid, rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowUpdateHandle
Send an update request to the workflow and return a handle to it.
-
#terminate(reason = nil, details: [], rpc_metadata: nil, rpc_timeout: nil) ⇒ Object
Terminate the workflow.
-
#update_handle(id, specific_run_id: run_id) ⇒ WorkflowUpdateHandle
Get a handle for an update.
Instance Attribute Details
#first_execution_run_id ⇒ String? (readonly)
Run ID used for some calls like #cancel and #terminate to ensure the cancel and terminate happen for a workflow ID on a chain started with this run ID.
This can be set when using Temporalio::Client#workflow_handle. When Temporalio::Client#start_workflow is called without a start signal, this is set to the resulting run.
This cannot be mutated. If a different first execution run ID is needed, Temporalio::Client#workflow_handle must be used instead.
49 50 51 |
# File 'lib/temporalio/client/workflow_handle.rb', line 49 def first_execution_run_id @first_execution_run_id end |
#id ⇒ String (readonly)
Returns ID for the workflow.
17 18 19 |
# File 'lib/temporalio/client/workflow_handle.rb', line 17 def id @id end |
#result_run_id ⇒ String? (readonly)
Run ID used for #result calls if present to ensure result is for a workflow starting from this run.
When this handle is created via Temporalio::Client#workflow_handle, this is the same as #run_id. When this handle is created via Temporalio::Client#start_workflow, this value will be the resulting run ID.
This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.
37 38 39 |
# File 'lib/temporalio/client/workflow_handle.rb', line 37 def result_run_id @result_run_id end |
#run_id ⇒ String? (readonly)
Run ID used for #signal, #query, and #start_update/#execute_update calls if present to ensure the signal/query/update happen on this exact run.
This is only created via Temporalio::Client#workflow_handle. Temporalio::Client#start_workflow will not set this value.
This cannot be mutated. If a different run ID is needed, Temporalio::Client#workflow_handle must be used instead.
27 28 29 |
# File 'lib/temporalio/client/workflow_handle.rb', line 27 def run_id @run_id end |
Instance Method Details
#cancel(rpc_metadata: nil, rpc_timeout: nil) ⇒ Object
Handles created as a result of signal with start will cancel the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Cancel the workflow. This will issue a cancellation for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.
402 403 404 405 406 407 408 409 410 411 412 413 |
# File 'lib/temporalio/client/workflow_handle.rb', line 402 def cancel( rpc_metadata: nil, rpc_timeout: nil ) @client._impl.cancel_workflow(Interceptor::CancelWorkflowInput.new( workflow_id: id, run_id:, first_execution_run_id:, rpc_metadata:, rpc_timeout: )) end |
#describe(rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowExecution::Description
Handles created as a result of Temporalio::Client#start_workflow will describe the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Get workflow details. This will get details for the #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/temporalio/client/workflow_handle.rb', line 154 def describe( rpc_metadata: nil, rpc_timeout: nil ) @client._impl.describe_workflow(Interceptor::DescribeWorkflowInput.new( workflow_id: id, run_id:, rpc_metadata:, rpc_timeout: )) end |
#execute_update(update, *args, id: SecureRandom.uuid, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object?
Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
WARNING: This API is experimental.
Send an update request to the workflow and wait for it to complete. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/temporalio/client/workflow_handle.rb', line 352 def execute_update( update, *args, id: SecureRandom.uuid, rpc_metadata: nil, rpc_timeout: nil ) start_update( update, *args, wait_for_stage: WorkflowUpdateWaitStage::COMPLETED, id:, rpc_metadata:, rpc_timeout: ).result end |
#fetch_history(event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowHistory
Get workflow history. This is a helper on top of #fetch_history_events.
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
# File 'lib/temporalio/client/workflow_handle.rb', line 176 def fetch_history( event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, rpc_metadata: nil, rpc_timeout: nil ) WorkflowHistory.new( fetch_history_events( event_filter_type:, skip_archival:, rpc_metadata:, rpc_timeout: ).to_a ) end |
#fetch_history_events(wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_metadata: nil, rpc_timeout: nil) ⇒ Enumerator<Api::History::V1::HistoryEvent>
Fetch an enumerator of history events for this workflow. Internally this is done in paginated form, but it is presented as an enumerator.
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/temporalio/client/workflow_handle.rb', line 208 def fetch_history_events( wait_new_event: false, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT, skip_archival: false, specific_run_id: run_id, rpc_metadata: nil, rpc_timeout: nil ) @client._impl.fetch_workflow_history_events(Interceptor::FetchWorkflowHistoryEventsInput.new( workflow_id: id, run_id: specific_run_id, wait_new_event:, event_filter_type:, skip_archival:, rpc_metadata:, rpc_timeout: )) end |
#query(query, *args, reject_condition: @client.options.default_workflow_query_reject_condition, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object?
Handles created as a result of Temporalio::Client#start_workflow will query the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Query the workflow. This will query for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 |
# File 'lib/temporalio/client/workflow_handle.rb', line 273 def query( query, *args, reject_condition: @client..default_workflow_query_reject_condition, rpc_metadata: nil, rpc_timeout: nil ) @client._impl.query_workflow(Interceptor::QueryWorkflowInput.new( workflow_id: id, run_id:, query:, args:, reject_condition:, headers: {}, rpc_metadata:, rpc_timeout: )) end |
#result(follow_runs: true, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object
Wait for the result of the workflow.
This will use #result_run_id if present to base the result on. To use another run ID, a new handle must be created via Temporalio::Client#workflow_handle.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/temporalio/client/workflow_handle.rb', line 75 def result( follow_runs: true, rpc_metadata: nil, rpc_timeout: nil ) # Wait on the close event, following as needed hist_run_id = result_run_id loop do # Get close event event = fetch_history_events( wait_new_event: true, event_filter_type: Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT, skip_archival: true, specific_run_id: hist_run_id, rpc_metadata:, rpc_timeout: ).next # Check each close type' case event.event_type when :EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED attrs = event.workflow_execution_completed_event_attributes hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? return @client.data_converter.from_payloads(attrs.result).first when :EVENT_TYPE_WORKFLOW_EXECUTION_FAILED attrs = event.workflow_execution_failed_event_attributes hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? raise Error::WorkflowFailedError.new, cause: @client.data_converter.from_failure(attrs.failure) when :EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED attrs = event.workflow_execution_canceled_event_attributes raise Error::WorkflowFailedError.new, cause: Error::CanceledError.new( 'Workflow execution canceled', details: @client.data_converter.from_payloads(attrs&.details) ) when :EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED attrs = event.workflow_execution_terminated_event_attributes raise Error::WorkflowFailedError.new, cause: Error::TerminatedError.new( Internal::ProtoUtils.string_or(attrs.reason, 'Workflow execution terminated'), details: @client.data_converter.from_payloads(attrs&.details) ) when :EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT attrs = event.workflow_execution_timed_out_event_attributes hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? raise Error::WorkflowFailedError.new, cause: Error::TimeoutError.new( 'Workflow execution timed out', type: Api::Enums::V1::TimeoutType::TIMEOUT_TYPE_START_TO_CLOSE, last_heartbeat_details: [] ) when :EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW attrs = event.workflow_execution_continued_as_new_event_attributes hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? # TODO: Use more specific error and decode failure raise Error::WorkflowContinuedAsNewError.new(new_run_id: attrs.new_execution_run_id) else raise Error, "Unknown close event type: #{event.event_type}" end end end |
#signal(signal, *args, rpc_metadata: nil, rpc_timeout: nil) ⇒ Object
Handles created as a result of Temporalio::Client#start_workflow will signal the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Send a signal to the workflow. This will signal for #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/temporalio/client/workflow_handle.rb', line 239 def signal( signal, *args, rpc_metadata: nil, rpc_timeout: nil ) @client._impl.signal_workflow(Interceptor::SignalWorkflowInput.new( workflow_id: id, run_id:, signal:, args:, headers: {}, rpc_metadata:, rpc_timeout: )) end |
#start_update(update, *args, wait_for_stage:, id: SecureRandom.uuid, rpc_metadata: nil, rpc_timeout: nil) ⇒ WorkflowUpdateHandle
Handles created as a result of Temporalio::Client#start_workflow will send updates the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
WARNING: This API is experimental.
Send an update request to the workflow and return a handle to it. This will target the workflow with #run_id if present. To use a different run ID, create a new handle via Temporalio::Client#workflow_handle.
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/temporalio/client/workflow_handle.rb', line 312 def start_update( update, *args, wait_for_stage:, id: SecureRandom.uuid, rpc_metadata: nil, rpc_timeout: nil ) @client._impl.start_workflow_update(Interceptor::StartWorkflowUpdateInput.new( workflow_id: self.id, run_id:, update_id: id, update:, args:, wait_for_stage:, headers: {}, rpc_metadata:, rpc_timeout: )) end |
#terminate(reason = nil, details: [], rpc_metadata: nil, rpc_timeout: nil) ⇒ Object
Handles created as a result of signal with start will terminate the latest workflow with the same workflow ID even if it is unrelated to the started workflow.
Terminate the workflow. This will issue a termination for #run_id if present. This call will make sure to use the run chain starting from #first_execution_run_id if present. To create handles with these values, use Temporalio::Client#workflow_handle.
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 |
# File 'lib/temporalio/client/workflow_handle.rb', line 428 def terminate( reason = nil, details: [], rpc_metadata: nil, rpc_timeout: nil ) @client._impl.terminate_workflow(Interceptor::TerminateWorkflowInput.new( workflow_id: id, run_id:, first_execution_run_id:, reason:, details:, rpc_metadata:, rpc_timeout: )) end |
#update_handle(id, specific_run_id: run_id) ⇒ WorkflowUpdateHandle
WARNING: This API is experimental.
Get a handle for an update. The handle can be used to wait on the update result.
378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/temporalio/client/workflow_handle.rb', line 378 def update_handle( id, specific_run_id: run_id ) WorkflowUpdateHandle.new( client: @client, id:, workflow_id: self.id, workflow_run_id: specific_run_id, known_outcome: nil ) end |