Class: Floe::Workflow

Inherits:
WorkflowBase show all
Includes:
Logging
Defined in:
lib/floe/workflow.rb,
lib/floe/workflow/path.rb,
lib/floe/workflow/state.rb,
lib/floe/workflow/branch.rb,
lib/floe/workflow/catcher.rb,
lib/floe/workflow/context.rb,
lib/floe/workflow/retrier.rb,
lib/floe/workflow/states/map.rb,
lib/floe/workflow/choice_rule.rb,
lib/floe/workflow/states/fail.rb,
lib/floe/workflow/states/pass.rb,
lib/floe/workflow/states/task.rb,
lib/floe/workflow/states/wait.rb,
lib/floe/workflow/states/choice.rb,
lib/floe/workflow/choice_rule/or.rb,
lib/floe/workflow/item_processor.rb,
lib/floe/workflow/reference_path.rb,
lib/floe/workflow/states/succeed.rb,
lib/floe/workflow/choice_rule/and.rb,
lib/floe/workflow/choice_rule/not.rb,
lib/floe/workflow/states/parallel.rb,
lib/floe/workflow/choice_rule/data.rb,
lib/floe/workflow/payload_template.rb,
lib/floe/workflow/intrinsic_function.rb,
lib/floe/workflow/error_matcher_mixin.rb,
lib/floe/workflow/states/retry_catch_mixin.rb,
lib/floe/workflow/intrinsic_function/parser.rb,
lib/floe/workflow/states/input_output_mixin.rb,
lib/floe/workflow/states/non_terminal_mixin.rb,
lib/floe/workflow/states/child_workflow_mixin.rb,
lib/floe/workflow/intrinsic_function/transformer.rb

Defined Under Namespace

Modules: ErrorMatcherMixin, States Classes: Branch, Catcher, ChoiceRule, Context, IntrinsicFunction, ItemProcessor, Path, PayloadTemplate, ReferencePath, Retrier, State

Instance Attribute Summary collapse

Attributes inherited from WorkflowBase

#name, #payload, #start_at, #states, #states_by_name

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logging

included, #logger, #logger=

Methods inherited from WorkflowBase

#run

Methods included from ValidationMixin

included, #invalid_field_error!, #missing_field_error!, #parser_error!, #runtime_field_error!, #workflow_state?, #wrap_parser_error

Constructor Details

#initialize(payload, context = nil, credentials = nil, name = nil) ⇒ Workflow

Returns a new instance of Workflow.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/floe/workflow.rb', line 82

def initialize(payload, context = nil, credentials = nil, name = nil)
  payload     = JSON.parse(payload)     if payload.kind_of?(String)
  credentials = JSON.parse(credentials) if credentials.kind_of?(String)
  context     = Context.new(context)    unless context.kind_of?(Context)

  # backwards compatibility
  # caller should really put credentials into context and not pass that variable
  context.credentials = credentials if credentials

  @context = context
  @comment = payload["Comment"]

  super(payload, name)
rescue Floe::Error
  raise
rescue => err
  raise Floe::InvalidWorkflowError, err.message
end

Instance Attribute Details

#commentObject (readonly)

Returns the value of attribute comment.



80
81
82
# File 'lib/floe/workflow.rb', line 80

def comment
  @comment
end

#contextObject (readonly)

Returns the value of attribute context.



80
81
82
# File 'lib/floe/workflow.rb', line 80

def context
  @context
end

Class Method Details

.load(path_or_io, context = nil, credentials = {}, name = nil) ⇒ Object



11
12
13
14
15
16
17
# File 'lib/floe/workflow.rb', line 11

def load(path_or_io, context = nil, credentials = {}, name = nil)
  payload = path_or_io.respond_to?(:read) ? path_or_io.read : File.read(path_or_io)
  # default the name if it is a filename and none was passed in
  name ||= path_or_io.respond_to?(:read) ? "stream" : path_or_io.split("/").last.split(".").first

  new(payload, context, credentials, name)
end

.wait(workflows, timeout: nil, &block) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/floe/workflow.rb', line 19

def wait(workflows, timeout: nil, &block)
  workflows = [workflows] if workflows.kind_of?(self)

  run_until   = Time.now.utc + timeout if timeout.to_i > 0
  ready       = []
  queue       = Queue.new
  wait_thread = Thread.new do
    loop do
      Runner.for_resource("docker").wait do |event, runner_context|
        queue.push([event, runner_context])
      end
    end
  end

  loop do
    ready = workflows.select(&:step_nonblock_ready?)
    break if block.nil? && !ready.empty?

    ready.each(&block)

    # Break if all workflows are completed or we've exceeded the
    # requested timeout
    break if workflows.all?(&:end?)
    break if timeout && (timeout.zero? || Time.now.utc > run_until)

    # Find the earliest time that we should wakeup if no container events
    # are caught, either a workflow in a Wait or Retry state or we've
    # exceeded the requested timeout
    wait_until = workflows.map(&:wait_until)
                          .unshift(run_until)
                          .compact
                          .min

    # If a workflow is in a waiting state wakeup the main thread when
    # it will be done sleeping
    if wait_until
      sleep_thread = Thread.new do
        sleep_duration = wait_until - Time.now.utc
        sleep sleep_duration if sleep_duration > 0
        queue.push(nil)
      end
    end

    loop do
      # Block until an event is raised
      event, data = queue.pop
      break if event.nil?

      # break out of the loop if the event is for one of our workflows
      break if queue.empty? || workflows.detect { |wf| wf.execution_id == data["execution_id"] }
    end
  ensure
    sleep_thread&.kill
  end

  ready
ensure
  wait_thread&.kill
end

Instance Method Details

#credentialsObject

backwards compatibility. Caller should access directly from context



171
172
173
# File 'lib/floe/workflow.rb', line 171

def credentials
  @context.credentials
end

#current_stateObject

NOTE: Expecting the context to be initialized (via start_workflow) before this



166
167
168
# File 'lib/floe/workflow.rb', line 166

def current_state
  states_by_name[context.state_name]
end

#end?Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/floe/workflow.rb', line 147

def end?
  context.ended?
end

#execution_idObject



175
176
177
# File 'lib/floe/workflow.rb', line 175

def execution_id
  @context.execution["Id"]
end

#outputObject



143
144
145
# File 'lib/floe/workflow.rb', line 143

def output
  context.json_output if end?
end

#run_nonblockObject



101
102
103
104
105
# File 'lib/floe/workflow.rb', line 101

def run_nonblock
  start_workflow
  loop while step_nonblock == 0 && !end?
  self
end

#start_workflowObject

setup a workflow



152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/floe/workflow.rb', line 152

def start_workflow
  return if context.state_name

  context.state["Name"]  = start_at
  context.state["Input"] = context.execution["Input"].dup
  context.state["Guid"]  = SecureRandom.uuid

  context.execution["Id"]      ||= SecureRandom.uuid
  context.execution["StartTime"] = Time.now.utc.iso8601

  self
end

#statusObject



139
140
141
# File 'lib/floe/workflow.rb', line 139

def status
  context.status
end

#step_nonblockObject

NOTE: If running manually, make sure to call start_workflow at startup



108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/floe/workflow.rb', line 108

def step_nonblock
  return Errno::EPERM if end?

  result = current_state.run_nonblock!(context)
  return result if result != 0

  # if it completed the step
  context.state_history << context.state
  context.next_state ? step! : end_workflow!

  result
end

#step_nonblock_ready?Boolean

if this hasn’t started (and we have no current_state yet), assume it is ready

Returns:

  • (Boolean)


127
128
129
# File 'lib/floe/workflow.rb', line 127

def step_nonblock_ready?
  !context.started? || current_state.ready?(context)
end

#step_nonblock_wait(timeout: nil) ⇒ Object

if this hasn’t started (and we have no current_state yet), assume it is ready



122
123
124
# File 'lib/floe/workflow.rb', line 122

def step_nonblock_wait(timeout: nil)
  context.started? ? current_state.wait(context, :timeout => timeout) : 0
end

#wait_untilObject



135
136
137
# File 'lib/floe/workflow.rb', line 135

def wait_until
  current_state.wait_until(context)
end

#waiting?Boolean

Returns:

  • (Boolean)


131
132
133
# File 'lib/floe/workflow.rb', line 131

def waiting?
  current_state.waiting?(context)
end