Class: Floe::Workflow::States::Task

Inherits:
Floe::Workflow::State show all
Includes:
InputOutputMixin, NonTerminalMixin, RetryCatchMixin
Defined in:
lib/floe/workflow/states/task.rb

Instance Attribute Summary collapse

Attributes inherited from Floe::Workflow::State

#comment, #name, #payload, #type

Instance Method Summary collapse

Methods included from RetryCatchMixin

#catch_error!, #fail_workflow!, #find_catcher, #find_retrier, #retry_state!

Methods included from NonTerminalMixin

#validate_state_next!

Methods included from InputOutputMixin

#process_input, #process_output

Methods inherited from Floe::Workflow::State

build!, #finished?, #long_name, #mark_error, #mark_finished, #mark_started, #ready?, #run_nonblock!, #short_name, #started?, #wait, #wait_until, #waiting?

Methods included from ValidationMixin

included, #invalid_field_error!, #missing_field_error!, #parser_error!, #runtime_field_error!, #workflow_state?, #wrap_parser_error

Constructor Details

#initialize(workflow, name, payload) ⇒ Task

Returns a new instance of Task.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/floe/workflow/states/task.rb', line 15

def initialize(workflow, name, payload)
  super

  @heartbeat_seconds = payload["HeartbeatSeconds"]
  @next              = payload["Next"]
  @end               = !!payload["End"]
  @resource          = payload["Resource"]

  missing_field_error!("Resource") unless @resource.kind_of?(String)
  @runner = wrap_parser_error("Resource", @resource) { Floe::Runner.for_resource(@resource) }

  @timeout_seconds   = payload["TimeoutSeconds"]
  @retry             = payload["Retry"].to_a.map.with_index { |retrier, i| Retrier.new(workflow, name + ["Retry", i.to_s], retrier) }
  @catch             = payload["Catch"].to_a.map.with_index { |catcher, i| Catcher.new(workflow, name + ["Catch", i.to_s], catcher) }
  @input_path        = Path.new(payload.fetch("InputPath", "$"))
  @output_path       = Path.new(payload.fetch("OutputPath", "$"))
  @result_path       = ReferencePath.new(payload.fetch("ResultPath", "$"))
  @parameters        = PayloadTemplate.new(payload["Parameters"])     if payload["Parameters"]
  @result_selector   = PayloadTemplate.new(payload["ResultSelector"]) if payload["ResultSelector"]
  @credentials       = PayloadTemplate.new(payload["Credentials"])    if payload["Credentials"]

  validate_state!(workflow)
end

Instance Attribute Details

#catchObject (readonly)

Returns the value of attribute catch.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def catch
  @catch
end

#credentialsObject (readonly)

Returns the value of attribute credentials.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def credentials
  @credentials
end

#endObject (readonly)

Returns the value of attribute end.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def end
  @end
end

#heartbeat_secondsObject (readonly)

Returns the value of attribute heartbeat_seconds.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def heartbeat_seconds
  @heartbeat_seconds
end

#input_pathObject (readonly)

Returns the value of attribute input_path.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def input_path
  @input_path
end

#nextObject (readonly)

Returns the value of attribute next.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def next
  @next
end

#output_pathObject (readonly)

Returns the value of attribute output_path.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def output_path
  @output_path
end

#parametersObject (readonly)

Returns the value of attribute parameters.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def parameters
  @parameters
end

#resourceObject (readonly)

Returns the value of attribute resource.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def resource
  @resource
end

#result_pathObject (readonly)

Returns the value of attribute result_path.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def result_path
  @result_path
end

#result_selectorObject (readonly)

Returns the value of attribute result_selector.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def result_selector
  @result_selector
end

#retryObject (readonly)

Returns the value of attribute retry.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def retry
  @retry
end

#timeout_secondsObject (readonly)

Returns the value of attribute timeout_seconds.



11
12
13
# File 'lib/floe/workflow/states/task.rb', line 11

def timeout_seconds
  @timeout_seconds
end

Instance Method Details

#end?Boolean

Returns:

  • (Boolean)


71
72
73
# File 'lib/floe/workflow/states/task.rb', line 71

def end?
  @end
end

#finish(context) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/floe/workflow/states/task.rb', line 48

def finish(context)
  output = runner.output(context.state["RunnerContext"])

  if success?(context)
    output = parse_output(output)
    context.output = process_output(context, output)
  else
    error = parse_error(output)
    retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
  end
  super
ensure
  runner.cleanup(context.state["RunnerContext"])
end

#running?(context) ⇒ Boolean

Returns:

  • (Boolean)


63
64
65
66
67
68
69
# File 'lib/floe/workflow/states/task.rb', line 63

def running?(context)
  return true  if waiting?(context)
  return false if finished?(context)

  runner.status!(context.state["RunnerContext"])
  runner.running?(context.state["RunnerContext"])
end

#start(context) ⇒ Object



39
40
41
42
43
44
45
46
# File 'lib/floe/workflow/states/task.rb', line 39

def start(context)
  super

  input          = process_input(context)
  runner_context = runner.run_async!(resource, input, credentials&.value({}, context.credentials), context)

  context.state["RunnerContext"] = runner_context
end