Class: Temporal::Activity::TaskProcessor
- Inherits:
-
Object
- Object
- Temporal::Activity::TaskProcessor
- Includes:
- Concerns::Payloads
- Defined in:
- lib/temporal/activity/task_processor.rb
Instance Method Summary collapse
-
#initialize(task, namespace, activity_lookup, client, middleware_chain) ⇒ TaskProcessor
constructor
A new instance of TaskProcessor.
- #process ⇒ Object
Methods included from Concerns::Payloads
#from_details_payloads, #from_payload, #from_payloads, #from_result_payloads, #from_signal_payloads, #to_details_payloads, #to_payload, #to_payloads, #to_result_payloads, #to_signal_payloads
Constructor Details
#initialize(task, namespace, activity_lookup, client, middleware_chain) ⇒ TaskProcessor
Returns a new instance of TaskProcessor.
13 14 15 16 17 18 19 20 21 22 |
# File 'lib/temporal/activity/task_processor.rb', line 13 def initialize(task, namespace, activity_lookup, client, middleware_chain) @task = task @namespace = namespace @metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace) @task_token = task.task_token @activity_name = task.activity_type.name @activity_class = activity_lookup.find(activity_name) @client = client @middleware_chain = middleware_chain end |
Instance Method Details
#process ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/temporal/activity/task_processor.rb', line 24 def process start_time = Time.now Temporal.logger.debug("Processing Activity task", .to_h) Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name) context = Activity::Context.new(client, ) if !activity_class raise ActivityNotRegistered, 'Activity is not registered with this worker' end result = middleware_chain.invoke() do activity_class.execute_in_context(context, from_payloads(task.input)) end # Do not complete asynchronous activities, these should be completed manually respond_completed(result) unless context.async? rescue StandardError, ScriptError => error Temporal::ErrorHandler.handle(error, metadata: ) respond_failed(error) ensure time_diff_ms = ((Time.now - start_time) * 1000).round Temporal.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name) Temporal.logger.debug("Activity task processed", .to_h.merge(execution_time: time_diff_ms)) end |