Class: Temporal::Client::GRPCClient

Inherits:
Object
  • Object
show all
Includes:
Temporal::Concerns::Payloads
Defined in:
lib/temporal/client/grpc_client.rb

Constant Summary collapse

WORKFLOW_ID_REUSE_POLICY =
{
  allow_failed: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
  allow: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
  reject: Temporal::Api::Enums::V1::WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE
}.freeze
HISTORY_EVENT_FILTER =
{
  all: Temporal::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_ALL_EVENT,
  close: Temporal::Api::Enums::V1::HistoryEventFilterType::HISTORY_EVENT_FILTER_TYPE_CLOSE_EVENT,
}.freeze
SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL =
30

Instance Method Summary collapse

Methods included from Temporal::Concerns::Payloads

#from_details_payloads, #from_payload, #from_payloads, #from_result_payloads, #from_signal_payloads, #to_details_payloads, #to_payload, #to_payloads, #to_result_payloads, #to_signal_payloads

Constructor Details

#initialize(host, port, identity) ⇒ GRPCClient

Returns a new instance of GRPCClient.



26
27
28
29
30
31
32
# File 'lib/temporal/client/grpc_client.rb', line 26

def initialize(host, port, identity)
  @url = "#{host}:#{port}"
  @identity = identity
  @poll = true
  @poll_mutex = Mutex.new
  @poll_request = nil
end

Instance Method Details

#cancel_polling_requestObject



396
397
398
399
400
401
# File 'lib/temporal/client/grpc_client.rb', line 396

def cancel_polling_request
  poll_mutex.synchronize do
    @poll = false
    poll_request&.cancel
  end
end

#count_workflow_executionsObject

Raises:

  • (NotImplementedError)


353
354
355
# File 'lib/temporal/client/grpc_client.rb', line 353

def count_workflow_executions
  raise NotImplementedError
end

#deprecate_namespace(name:) ⇒ Object



68
69
70
71
# File 'lib/temporal/client/grpc_client.rb', line 68

def deprecate_namespace(name:)
  request = Temporal::Api::WorkflowService::V1::DeprecateNamespaceRequest.new(namespace: name)
  client.deprecate_namespace(request)
end

#describe_namespace(name:) ⇒ Object



48
49
50
51
# File 'lib/temporal/client/grpc_client.rb', line 48

def describe_namespace(name:)
  request = Temporal::Api::WorkflowService::V1::DescribeNamespaceRequest.new(namespace: name)
  client.describe_namespace(request)
end

#describe_task_queue(namespace:, task_queue:) ⇒ Object



384
385
386
387
388
389
390
391
392
393
394
# File 'lib/temporal/client/grpc_client.rb', line 384

def describe_task_queue(namespace:, task_queue:)
  request = Temporal::Api::WorkflowService::V1::DescribeTaskQueueRequest.new(
    namespace: namespace,
    task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
      name: task_queue
    ),
    task_queue_type: Temporal::Api::Enums::V1::TaskQueueType::Workflow,
    include_task_queue_status: true
  )
  client.describe_task_queue(request)
end

#describe_workflow_execution(namespace:, workflow_id:, run_id:) ⇒ Object



373
374
375
376
377
378
379
380
381
382
# File 'lib/temporal/client/grpc_client.rb', line 373

def describe_workflow_execution(namespace:, workflow_id:, run_id:)
  request = Temporal::Api::WorkflowService::V1::DescribeWorkflowExecutionRequest.new(
    namespace: namespace,
    execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id
    )
  )
  client.describe_workflow_execution(request)
end

#get_search_attributesObject

Raises:

  • (NotImplementedError)


357
358
359
# File 'lib/temporal/client/grpc_client.rb', line 357

def get_search_attributes
  raise NotImplementedError
end

#get_workflow_execution_history(namespace:, workflow_id:, run_id:, next_page_token: nil, wait_for_new_event: false, event_type: :all, timeout: nil) ⇒ Object



123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/temporal/client/grpc_client.rb', line 123

def get_workflow_execution_history(
  namespace:,
  workflow_id:,
  run_id:,
  next_page_token: nil,
  wait_for_new_event: false,
  event_type: :all,
  timeout: nil
)
  if wait_for_new_event 
    if timeout.nil?
      # This is an internal error.  Wrappers should enforce this.
      raise "You must specify a timeout when wait_for_new_event = true."
    elsif timeout > SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL
      raise ClientError.new(
        "You may not specify a timeout of more than #{SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL} seconds, got: #{timeout}."
      )
    end
  end
  request = Temporal::Api::WorkflowService::V1::GetWorkflowExecutionHistoryRequest.new(
    namespace: namespace,
    execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id
    ),
    next_page_token: next_page_token,
    wait_new_event: wait_for_new_event,
    history_event_filter_type: HISTORY_EVENT_FILTER[event_type]
  )
  deadline = timeout ? Time.now + timeout : nil
  client.get_workflow_execution_history(request, deadline: deadline)
end

#list_archived_workflow_executionsObject

Raises:

  • (NotImplementedError)


345
346
347
# File 'lib/temporal/client/grpc_client.rb', line 345

def list_archived_workflow_executions
  raise NotImplementedError
end

#list_closed_workflow_executionsObject

Raises:

  • (NotImplementedError)


337
338
339
# File 'lib/temporal/client/grpc_client.rb', line 337

def list_closed_workflow_executions
  raise NotImplementedError
end

#list_namespaces(page_size:) ⇒ Object



53
54
55
56
# File 'lib/temporal/client/grpc_client.rb', line 53

def list_namespaces(page_size:)
  request = Temporal::Api::WorkflowService::V1::ListNamespacesRequest.new(pageSize: page_size)
  client.list_namespaces(request)
end

#list_open_workflow_executionsObject

Raises:

  • (NotImplementedError)


333
334
335
# File 'lib/temporal/client/grpc_client.rb', line 333

def list_open_workflow_executions
  raise NotImplementedError
end

#list_workflow_executionsObject

Raises:

  • (NotImplementedError)


341
342
343
# File 'lib/temporal/client/grpc_client.rb', line 341

def list_workflow_executions
  raise NotImplementedError
end

#poll_activity_task_queue(namespace:, task_queue:) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/temporal/client/grpc_client.rb', line 192

def poll_activity_task_queue(namespace:, task_queue:)
  request = Temporal::Api::WorkflowService::V1::PollActivityTaskQueueRequest.new(
    identity: identity,
    namespace: namespace,
    task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
      name: task_queue
    )
  )

  poll_mutex.synchronize do
    return unless can_poll?
    @poll_request = client.poll_activity_task_queue(request, return_op: true)
  end

  poll_request.execute
end

#poll_workflow_task_queue(namespace:, task_queue:) ⇒ Object



156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/temporal/client/grpc_client.rb', line 156

def poll_workflow_task_queue(namespace:, task_queue:)
  request = Temporal::Api::WorkflowService::V1::PollWorkflowTaskQueueRequest.new(
    identity: identity,
    namespace: namespace,
    task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
      name: task_queue
    )
  )

  poll_mutex.synchronize do
    return unless can_poll?
    @poll_request = client.poll_workflow_task_queue(request, return_op: true)
  end

  poll_request.execute
end

#query_workflowObject

Raises:

  • (NotImplementedError)


369
370
371
# File 'lib/temporal/client/grpc_client.rb', line 369

def query_workflow
  raise NotImplementedError
end

#record_activity_task_heartbeat(task_token:, details: nil) ⇒ Object



209
210
211
212
213
214
215
216
# File 'lib/temporal/client/grpc_client.rb', line 209

def record_activity_task_heartbeat(task_token:, details: nil)
  request = Temporal::Api::WorkflowService::V1::RecordActivityTaskHeartbeatRequest.new(
    task_token: task_token,
    details: to_details_payloads(details),
    identity: identity
  )
  client.record_activity_task_heartbeat(request)
end

#record_activity_task_heartbeat_by_idObject

Raises:

  • (NotImplementedError)


218
219
220
# File 'lib/temporal/client/grpc_client.rb', line 218

def record_activity_task_heartbeat_by_id
  raise NotImplementedError
end

#register_namespace(name:, description: nil, global: false, retention_period: 10) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/temporal/client/grpc_client.rb', line 34

def register_namespace(name:, description: nil, global: false, retention_period: 10)
  request = Temporal::Api::WorkflowService::V1::RegisterNamespaceRequest.new(
    namespace: name,
    description: description,
    is_global_namespace: global,
    workflow_execution_retention_period: Google::Protobuf::Duration.new(
      seconds: retention_period * 24 * 60 * 60
    )
  )
  client.register_namespace(request)
rescue GRPC::AlreadyExists => e
  raise Temporal::NamespaceAlreadyExistsFailure, e.details
end

#request_cancel_workflow_executionObject

Raises:

  • (NotImplementedError)


277
278
279
# File 'lib/temporal/client/grpc_client.rb', line 277

def request_cancel_workflow_execution
  raise NotImplementedError
end

#reset_sticky_task_queueObject

Raises:

  • (NotImplementedError)


365
366
367
# File 'lib/temporal/client/grpc_client.rb', line 365

def reset_sticky_task_queue
  raise NotImplementedError
end

#reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:) ⇒ Object



299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/temporal/client/grpc_client.rb', line 299

def reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:)
  request = Temporal::Api::WorkflowService::V1::ResetWorkflowExecutionRequest.new(
    namespace: namespace,
    workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id,
    ),
    reason: reason,
    workflow_task_finish_event_id: workflow_task_event_id
  )
  client.reset_workflow_execution(request)
end

#respond_activity_task_canceled(task_token:, details: nil) ⇒ Object



264
265
266
267
268
269
270
271
# File 'lib/temporal/client/grpc_client.rb', line 264

def respond_activity_task_canceled(task_token:, details: nil)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCanceledRequest.new(
    task_token: task_token,
    details: to_details_payloads(details),
    identity: identity
  )
  client.respond_activity_task_canceled(request)
end

#respond_activity_task_canceled_by_idObject

Raises:

  • (NotImplementedError)


273
274
275
# File 'lib/temporal/client/grpc_client.rb', line 273

def respond_activity_task_canceled_by_id
  raise NotImplementedError
end

#respond_activity_task_completed(task_token:, result:) ⇒ Object



222
223
224
225
226
227
228
229
# File 'lib/temporal/client/grpc_client.rb', line 222

def respond_activity_task_completed(task_token:, result:)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCompletedRequest.new(
    identity: identity,
    task_token: task_token,
    result: to_result_payloads(result),
  )
  client.respond_activity_task_completed(request)
end

#respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id:, run_id:, result:) ⇒ Object



231
232
233
234
235
236
237
238
239
240
241
# File 'lib/temporal/client/grpc_client.rb', line 231

def respond_activity_task_completed_by_id(namespace:, activity_id:, workflow_id:, run_id:, result:)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskCompletedByIdRequest.new(
    identity: identity,
    namespace: namespace,
    workflow_id: workflow_id,
    run_id: run_id,
    activity_id: activity_id,
    result: to_result_payloads(result)
  )
  client.respond_activity_task_completed_by_id(request)
end

#respond_activity_task_failed(task_token:, exception:) ⇒ Object



243
244
245
246
247
248
249
250
# File 'lib/temporal/client/grpc_client.rb', line 243

def respond_activity_task_failed(task_token:, exception:)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskFailedRequest.new(
    identity: identity,
    task_token: task_token,
    failure: Serializer::Failure.new(exception).to_proto
  )
  client.respond_activity_task_failed(request)
end

#respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, run_id:, exception:) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
# File 'lib/temporal/client/grpc_client.rb', line 252

def respond_activity_task_failed_by_id(namespace:, activity_id:, workflow_id:, run_id:, exception:)
  request = Temporal::Api::WorkflowService::V1::RespondActivityTaskFailedByIdRequest.new(
    identity: identity,
    namespace: namespace,
    workflow_id: workflow_id,
    run_id: run_id,
    activity_id: activity_id,
    failure: Serializer::Failure.new(exception).to_proto
  )
  client.respond_activity_task_failed_by_id(request)
end

#respond_query_task_completedObject

Raises:

  • (NotImplementedError)


361
362
363
# File 'lib/temporal/client/grpc_client.rb', line 361

def respond_query_task_completed
  raise NotImplementedError
end

#respond_workflow_task_completed(task_token:, commands:) ⇒ Object



173
174
175
176
177
178
179
180
# File 'lib/temporal/client/grpc_client.rb', line 173

def respond_workflow_task_completed(task_token:, commands:)
  request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskCompletedRequest.new(
    identity: identity,
    task_token: task_token,
    commands: Array(commands).map { |(_, command)| Serializer.serialize(command) }
  )
  client.respond_workflow_task_completed(request)
end

#respond_workflow_task_failed(task_token:, cause:, exception: nil) ⇒ Object



182
183
184
185
186
187
188
189
190
# File 'lib/temporal/client/grpc_client.rb', line 182

def respond_workflow_task_failed(task_token:, cause:, exception: nil)
  request = Temporal::Api::WorkflowService::V1::RespondWorkflowTaskFailedRequest.new(
    identity: identity,
    task_token: task_token,
    cause: cause,
    failure: Serializer::Failure.new(exception).to_proto
  )
  client.respond_workflow_task_failed(request)
end

#scan_workflow_executionsObject

Raises:

  • (NotImplementedError)


349
350
351
# File 'lib/temporal/client/grpc_client.rb', line 349

def scan_workflow_executions
  raise NotImplementedError
end

#signal_with_start_workflow_executionObject

Raises:

  • (NotImplementedError)


295
296
297
# File 'lib/temporal/client/grpc_client.rb', line 295

def signal_with_start_workflow_execution
  raise NotImplementedError
end

#signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: nil) ⇒ Object



281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/temporal/client/grpc_client.rb', line 281

def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input: nil)
  request = Temporal::Api::WorkflowService::V1::SignalWorkflowExecutionRequest.new(
    namespace: namespace,
    workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id
    ),
    signal_name: signal,
    input: to_signal_payloads(input),
    identity: identity
  )
  client.signal_workflow_execution(request)
end

#start_workflow_execution(namespace:, workflow_id:, workflow_name:, task_queue:, input: nil, execution_timeout:, run_timeout:, task_timeout:, workflow_id_reuse_policy: nil, headers: nil, cron_schedule: nil) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/temporal/client/grpc_client.rb', line 73

def start_workflow_execution(
  namespace:,
  workflow_id:,
  workflow_name:,
  task_queue:,
  input: nil,
  execution_timeout:,
  run_timeout:,
  task_timeout:,
  workflow_id_reuse_policy: nil,
  headers: nil,
  cron_schedule: nil
)
  request = Temporal::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
    identity: identity,
    namespace: namespace,
    workflow_type: Temporal::Api::Common::V1::WorkflowType.new(
      name: workflow_name
    ),
    workflow_id: workflow_id,
    task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
      name: task_queue
    ),
    input: to_payloads(input),
    workflow_execution_timeout: execution_timeout,
    workflow_run_timeout: run_timeout,
    workflow_task_timeout: task_timeout,
    request_id: SecureRandom.uuid,
    header: Temporal::Api::Common::V1::Header.new(
      fields: headers
    ),
    cron_schedule: cron_schedule
  )

  if workflow_id_reuse_policy
    policy = WORKFLOW_ID_REUSE_POLICY[workflow_id_reuse_policy]
    raise Client::ArgumentError, 'Unknown workflow_id_reuse_policy specified' unless policy

    request.workflow_id_reuse_policy = policy
  end

  client.start_workflow_execution(request)
rescue GRPC::AlreadyExists => e
  # Feel like there should be cleaner way to do this...
  run_id = e.details[/RunId: ([a-f0-9]+-[a-f0-9]+-[a-f0-9]+-[a-f0-9]+-[a-f0-9]+)/, 1]
  raise Temporal::WorkflowExecutionAlreadyStartedFailure.new(e.details, run_id)
end

#terminate_workflow_execution(namespace:, workflow_id:, run_id:, reason: nil, details: nil) ⇒ Object



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/temporal/client/grpc_client.rb', line 312

def terminate_workflow_execution(
  namespace:,
  workflow_id:,
  run_id:,
  reason: nil,
  details: nil
)
  request = Temporal::Api::WorkflowService::V1::TerminateWorkflowExecutionRequest.new(
    identity: identity,
    namespace: namespace,
    workflow_execution: Temporal::Api::Common::V1::WorkflowExecution.new(
      workflow_id: workflow_id,
      run_id: run_id,
    ),
    reason: reason,
    details: to_details_payloads(details)
  )

  client.terminate_workflow_execution(request)
end

#update_namespace(name:, description:) ⇒ Object



58
59
60
61
62
63
64
65
66
# File 'lib/temporal/client/grpc_client.rb', line 58

def update_namespace(name:, description:)
  request = Temporal::Api::WorkflowService::V1::UpdateNamespaceRequest.new(
    namespace: name,
    update_info: Temporal::Api::WorkflowService::V1::UpdateNamespaceInfo.new(
      description: description
    )
  )
  client.update_namespace(request)
end