Class: Temporal::Workflow::TaskProcessor
- Inherits:
-
Object
- Object
- Temporal::Workflow::TaskProcessor
- Defined in:
- lib/temporal/workflow/task_processor.rb
Constant Summary collapse
- MAX_FAILED_ATTEMPTS =
1
Instance Method Summary collapse
-
#initialize(task, namespace, workflow_lookup, client, middleware_chain) ⇒ TaskProcessor
constructor
A new instance of TaskProcessor.
- #process ⇒ Object
Constructor Details
#initialize(task, namespace, workflow_lookup, client, middleware_chain) ⇒ TaskProcessor
Returns a new instance of TaskProcessor.
12 13 14 15 16 17 18 19 20 21 |
# File 'lib/temporal/workflow/task_processor.rb', line 12 def initialize(task, namespace, workflow_lookup, client, middleware_chain) @task = task @namespace = namespace @metadata = Metadata.generate(Metadata::WORKFLOW_TASK_TYPE, task, namespace) @task_token = task.task_token @workflow_name = task.workflow_type.name @workflow_class = workflow_lookup.find(workflow_name) @client = client @middleware_chain = middleware_chain end |
Instance Method Details
#process ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/temporal/workflow/task_processor.rb', line 23 def process start_time = Time.now Temporal.logger.debug("Processing Workflow task", .to_h) Temporal.metrics.timing('workflow_task.queue_time', queue_time_ms, workflow: workflow_name) if !workflow_class raise Temporal::WorkflowNotRegistered, 'Workflow is not registered with this worker' end history = fetch_full_history # TODO: For sticky workflows we need to cache the Executor instance executor = Workflow::Executor.new(workflow_class, history) commands = middleware_chain.invoke() do executor.run end complete_task(commands) rescue StandardError => error Temporal::ErrorHandler.handle(error, metadata: ) fail_task(error) ensure time_diff_ms = ((Time.now - start_time) * 1000).round Temporal.metrics.timing('workflow_task.latency', time_diff_ms, workflow: workflow_name) Temporal.logger.debug("Workflow task processed", .to_h.merge(execution_time: time_diff_ms)) end |