Class: Floe::Workflow::State

Inherits:
Object
  • Object
show all
Includes:
ValidationMixin
Defined in:
lib/floe/workflow/state.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ValidationMixin

included, #invalid_field_error!, #missing_field_error!, #parser_error!, #runtime_field_error!, #workflow_state?, #wrap_parser_error

Constructor Details

#initialize(_workflow, name, payload) ⇒ State

Returns a new instance of State.



26
27
28
29
30
31
# File 'lib/floe/workflow/state.rb', line 26

def initialize(_workflow, name, payload)
  @name     = name
  @payload  = payload
  @type     = payload["Type"]
  @comment  = payload["Comment"]
end

Instance Attribute Details

#commentObject (readonly)

Returns the value of attribute comment.



24
25
26
# File 'lib/floe/workflow/state.rb', line 24

def comment
  @comment
end

#nameObject (readonly)

Returns the value of attribute name.



24
25
26
# File 'lib/floe/workflow/state.rb', line 24

def name
  @name
end

#payloadObject (readonly)

Returns the value of attribute payload.



24
25
26
# File 'lib/floe/workflow/state.rb', line 24

def payload
  @payload
end

#typeObject (readonly)

Returns the value of attribute type.



24
25
26
# File 'lib/floe/workflow/state.rb', line 24

def type
  @type
end

Class Method Details

.build!(workflow, name, payload) ⇒ Object



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/floe/workflow/state.rb', line 9

def build!(workflow, name, payload)
  state_type = payload["Type"]
  missing_field_error!(name, "Type") if payload["Type"].nil?
  invalid_field_error!(name[0..-2], "Name", name.last, "must be less than or equal to 80 characters") if name.last.length > 80

  begin
    klass = Floe::Workflow::States.const_get(state_type)
  rescue NameError
    invalid_field_error!(name, "Type", state_type, "is not valid")
  end

  klass.new(workflow, name, payload)
end

Instance Method Details

#finish(context) ⇒ Object



62
63
64
# File 'lib/floe/workflow/state.rb', line 62

def finish(context)
  mark_finished(context)
end

#finished?(context) ⇒ Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/floe/workflow/state.rb', line 66

def finished?(context)
  context.state_finished?
end

#long_nameObject



117
118
119
# File 'lib/floe/workflow/state.rb', line 117

def long_name
  "#{type}:#{short_name}"
end

#mark_error(context, exception) ⇒ Object



89
90
91
92
93
94
95
# File 'lib/floe/workflow/state.rb', line 89

def mark_error(context, exception)
  # InputPath or OutputPath were bad.
  context.next_state = nil
  context.output     = {"Error" => exception.floe_error, "Cause" => exception.message}
  # Since finish threw an exception, super was never called. Calling that now.
  mark_finished(context)
end

#mark_finished(context) ⇒ Object



76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/floe/workflow/state.rb', line 76

def mark_finished(context)
  finished_time = Time.now.utc
  entered_time  = Time.parse(context.state["EnteredTime"])

  context.state["FinishedTime"] ||= finished_time.iso8601
  context.state["Duration"]       = finished_time - entered_time

  level = context.failed? ? :error : :info
  context.logger.public_send(level, "Running state: [#{long_name}] with input [#{context.json_input}]...Complete #{context.next_state ? "- next state [#{context.next_state}]" : "workflow -"} output: [#{context.json_output}]")

  0
end

#mark_started(context) ⇒ Object



70
71
72
73
74
# File 'lib/floe/workflow/state.rb', line 70

def mark_started(context)
  context.state["EnteredTime"] = Time.now.utc.iso8601

  context.logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...")
end

#ready?(context) ⇒ Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/floe/workflow/state.rb', line 97

def ready?(context)
  !started?(context) || !running?(context)
end

#run_nonblock!(context) ⇒ Object

Returns for incomplete Errno::EAGAIN, for completed 0.

Returns:

  • for incomplete Errno::EAGAIN, for completed 0



45
46
47
48
49
50
51
52
# File 'lib/floe/workflow/state.rb', line 45

def run_nonblock!(context)
  start(context) unless context.state_started?
  return Errno::EAGAIN unless ready?(context)

  finish(context)
rescue Floe::ExecutionError => e
  mark_error(context, e)
end

#running?(context) ⇒ Boolean

Returns:

  • (Boolean)

Raises:

  • (NotImplementedError)


101
102
103
# File 'lib/floe/workflow/state.rb', line 101

def running?(context)
  raise NotImplementedError, "Must be implemented in a subclass"
end

#short_nameObject



113
114
115
# File 'lib/floe/workflow/state.rb', line 113

def short_name
  name.last
end

#start(context) ⇒ Object



54
55
56
# File 'lib/floe/workflow/state.rb', line 54

def start(context)
  mark_started(context)
end

#started?(context) ⇒ Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/floe/workflow/state.rb', line 58

def started?(context)
  context.state_started?
end

#wait(context, timeout: nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
# File 'lib/floe/workflow/state.rb', line 33

def wait(context, timeout: nil)
  start = Time.now.utc

  loop do
    return 0             if ready?(context)
    return Errno::EAGAIN if timeout && (timeout.zero? || Time.now.utc - start > timeout)

    sleep(1)
  end
end

#wait_until(context) ⇒ Object



109
110
111
# File 'lib/floe/workflow/state.rb', line 109

def wait_until(context)
  context.state["WaitUntil"] && Time.parse(context.state["WaitUntil"])
end

#waiting?(context) ⇒ Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/floe/workflow/state.rb', line 105

def waiting?(context)
  context.state["WaitUntil"] && Time.now.utc <= Time.parse(context.state["WaitUntil"])
end