Class: Cadence::Workflow::DecisionTaskProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/cadence/workflow/decision_task_processor.rb

Constant Summary collapse

MAX_FAILED_ATTEMPTS =
50

Instance Method Summary collapse

Constructor Details

#initialize(task, domain, workflow_lookup, client, middleware_chain) ⇒ DecisionTaskProcessor

Returns a new instance of DecisionTaskProcessor.



11
12
13
14
15
16
17
18
19
# File 'lib/cadence/workflow/decision_task_processor.rb', line 11

def initialize(task, domain, workflow_lookup, client, middleware_chain)
  @task = task
  @domain = domain
  @task_token = task.taskToken
  @workflow_name = task.workflowType.name
  @workflow_class = workflow_lookup.find(workflow_name)
  @client = client
  @middleware_chain = middleware_chain
end

Instance Method Details

#processObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/cadence/workflow/decision_task_processor.rb', line 21

def process
  start_time = Time.now

  Cadence.logger.info("Processing a decision task for #{workflow_name}")
  Cadence.metrics.timing('decision_task.queue_time', queue_time_ms, workflow: workflow_name)

  unless workflow_class
    fail_task('Workflow does not exist')
    return
  end

  history = fetch_full_history
  # TODO: For sticky workflows we need to cache the Executor instance
  executor = Workflow::Executor.new(workflow_class, history)
   = Metadata.generate(Metadata::DECISION_TYPE, task, domain)

  decisions = middleware_chain.invoke() do
    executor.run
  end

  complete_task(decisions)
rescue StandardError => error
  fail_task(error.inspect)
  Cadence.logger.debug(error.backtrace.join("\n"))
ensure
  time_diff_ms = ((Time.now - start_time) * 1000).round
  Cadence.metrics.timing('decision_task.latency', time_diff_ms, workflow: workflow_name)
  Cadence.logger.debug("Decision task processed in #{time_diff_ms}ms")
end