Class: Temporal::Activity::TaskProcessor

Inherits:
Object
  • Object
show all
Includes:
Concerns::Payloads
Defined in:
lib/temporal/activity/task_processor.rb

Instance Method Summary collapse

Methods included from Concerns::Payloads

#from_details_payloads, #from_payload, #from_payloads, #from_result_payloads, #from_signal_payloads, #to_details_payloads, #to_payload, #to_payloads, #to_result_payloads, #to_signal_payloads

Constructor Details

#initialize(task, namespace, activity_lookup, client, middleware_chain) ⇒ TaskProcessor

Returns a new instance of TaskProcessor.



13
14
15
16
17
18
19
20
21
22
# File 'lib/temporal/activity/task_processor.rb', line 13

def initialize(task, namespace, activity_lookup, client, middleware_chain)
  @task = task
  @namespace = namespace
  @metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace)
  @task_token = task.task_token
  @activity_name = task.activity_type.name
  @activity_class = activity_lookup.find(activity_name)
  @client = client
  @middleware_chain = middleware_chain
end

Instance Method Details

#processObject



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/temporal/activity/task_processor.rb', line 24

def process
  start_time = Time.now

  Temporal.logger.debug("Processing Activity task", .to_h)
  Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name)

  context = Activity::Context.new(client, )

  if !activity_class
    raise ActivityNotRegistered, 'Activity is not registered with this worker'
  end

  result = middleware_chain.invoke() do
    activity_class.execute_in_context(context, from_payloads(task.input))
  end

  # Do not complete asynchronous activities, these should be completed manually
  respond_completed(result) unless context.async?
rescue StandardError, ScriptError => error
  Temporal::ErrorHandler.handle(error, metadata: )

  respond_failed(error)
ensure
  time_diff_ms = ((Time.now - start_time) * 1000).round
  Temporal.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name)
  Temporal.logger.debug("Activity task processed", .to_h.merge(execution_time: time_diff_ms))
end