Class: Dynflow::ExecutionPlan
- Inherits:
-
Serializable
- Object
- Serializable
- Dynflow::ExecutionPlan
- Includes:
- Algebrick::TypeCheck, Stateful
- Defined in:
- lib/dynflow/execution_plan.rb
Overview
TODO extract planning logic to an extra class ExecutionPlanner
Defined Under Namespace
Modules: Steps Classes: DependencyGraph, OutputReference
Instance Attribute Summary collapse
-
#ended_at ⇒ Object
readonly
Returns the value of attribute ended_at.
-
#execution_time ⇒ Object
readonly
Returns the value of attribute execution_time.
-
#finalize_flow ⇒ Object
readonly
Returns the value of attribute finalize_flow.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#real_time ⇒ Object
readonly
Returns the value of attribute real_time.
-
#root_plan_step ⇒ Object
readonly
Returns the value of attribute root_plan_step.
-
#run_flow ⇒ Object
readonly
Returns the value of attribute run_flow.
-
#started_at ⇒ Object
readonly
Returns the value of attribute started_at.
-
#steps ⇒ Object
readonly
Returns the value of attribute steps.
-
#world ⇒ Object
readonly
Returns the value of attribute world.
Attributes included from Stateful
Class Method Summary collapse
Instance Method Summary collapse
-
#actions ⇒ Array<Action>
Actions in Present phase.
- #add_finalize_step(action) ⇒ Object
- #add_plan_step(action_class, planned_by = nil) ⇒ Object
- #add_run_step(action) ⇒ Object
- #compute_execution_time ⇒ Object
- #current_run_flow ⇒ Object private
- #entry_action ⇒ Object
- #error? ⇒ Boolean
- #errors ⇒ Object
- #failed_steps ⇒ Object
- #generate_action_id ⇒ Object
- #generate_step_id ⇒ Object
-
#initialize(world, id = SecureRandom.uuid, state = :pending, root_plan_step = nil, run_flow = Flows::Concurrence.new([]), finalize_flow = Flows::Sequence.new([]), steps = {}, started_at = nil, ended_at = nil, execution_time = nil, real_time = 0.0) ⇒ ExecutionPlan
constructor
all params with default values are part of private api.
- #logger ⇒ Object
- #plan(*args) ⇒ Object
- #prepare(action_class) ⇒ Object
-
#progress ⇒ 0..1
info.
- #rescue_from_error ⇒ Object
- #rescue_plan_id ⇒ Object
- #rescue_strategy ⇒ Object
- #result ⇒ Object
- #save ⇒ Object
- #skip(step) ⇒ Object
- #steps_in_state(*states) ⇒ Object
-
#steps_to_skip(step) ⇒ Array<Steps::Abstract>
All the steps that need to get skipped when wanting to skip the step includes the step itself, all steps dependent on it (even transitively) FIND maybe move to persistence to let adapter to do it effectively?.
-
#switch_flow(new_flow, &block) ⇒ Object
private
Switches the flow type (Sequence, Concurrence) to be used within the block.
- #to_hash ⇒ Object
- #update_state(state) ⇒ Object
- #with_planning_scope(&block) ⇒ Object private
Methods included from Stateful
included, #set_state, #state_transitions, #states
Methods inherited from Serializable
Constructor Details
#initialize(world, id = SecureRandom.uuid, state = :pending, root_plan_step = nil, run_flow = Flows::Concurrence.new([]), finalize_flow = Flows::Sequence.new([]), steps = {}, started_at = nil, ended_at = nil, execution_time = nil, real_time = 0.0) ⇒ ExecutionPlan
all params with default values are part of private api
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/dynflow/execution_plan.rb', line 32 def initialize(world, id = SecureRandom.uuid, state = :pending, root_plan_step = nil, run_flow = Flows::Concurrence.new([]), finalize_flow = Flows::Sequence.new([]), steps = {}, started_at = nil, ended_at = nil, execution_time = nil, real_time = 0.0) @id = Type! id, String @world = Type! world, World self.state = state @run_flow = Type! run_flow, Flows::Abstract @finalize_flow = Type! finalize_flow, Flows::Abstract @root_plan_step = root_plan_step @started_at = Type! started_at, Time, NilClass @ended_at = Type! ended_at, Time, NilClass @execution_time = Type! execution_time, Numeric, NilClass @real_time = Type! real_time, Numeric steps.all? do |k, v| Type! k, Integer Type! v, Steps::Abstract end @steps = steps end |
Instance Attribute Details
#ended_at ⇒ Object (readonly)
Returns the value of attribute ended_at.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def ended_at @ended_at end |
#execution_time ⇒ Object (readonly)
Returns the value of attribute execution_time.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def execution_time @execution_time end |
#finalize_flow ⇒ Object (readonly)
Returns the value of attribute finalize_flow.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def finalize_flow @finalize_flow end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def id @id end |
#real_time ⇒ Object (readonly)
Returns the value of attribute real_time.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def real_time @real_time end |
#root_plan_step ⇒ Object (readonly)
Returns the value of attribute root_plan_step.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def root_plan_step @root_plan_step end |
#run_flow ⇒ Object (readonly)
Returns the value of attribute run_flow.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def run_flow @run_flow end |
#started_at ⇒ Object (readonly)
Returns the value of attribute started_at.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def started_at @started_at end |
#steps ⇒ Object (readonly)
Returns the value of attribute steps.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def steps @steps end |
#world ⇒ Object (readonly)
Returns the value of attribute world.
15 16 17 |
# File 'lib/dynflow/execution_plan.rb', line 15 def world @world end |
Class Method Details
.new_from_hash(hash, world) ⇒ Object
269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'lib/dynflow/execution_plan.rb', line 269 def self.new_from_hash(hash, world) check_class_matching hash execution_plan_id = hash[:id] steps = steps_from_hash(hash[:step_ids], execution_plan_id, world) self.new(world, execution_plan_id, hash[:state], steps[hash[:root_plan_step_id]], Flows::Abstract.from_hash(hash[:run_flow]), Flows::Abstract.from_hash(hash[:finalize_flow]), steps, string_to_time(hash[:started_at]), string_to_time(hash[:ended_at]), hash[:execution_time].to_f, hash[:real_time].to_f) end |
.state_transitions ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/dynflow/execution_plan.rb', line 22 def self.state_transitions @state_transitions ||= { pending: [:planning], planning: [:planned, :stopped], planned: [:running], running: [:paused, :stopped], paused: [:running], stopped: [] } end |
.states ⇒ Object
18 19 20 |
# File 'lib/dynflow/execution_plan.rb', line 18 def self.states @states ||= [:pending, :planning, :planned, :running, :paused, :stopped] end |
Instance Method Details
#actions ⇒ Array<Action>
Returns actions in Present phase.
309 310 311 312 313 |
# File 'lib/dynflow/execution_plan.rb', line 309 def actions @actions ||= begin [entry_action] + entry_action.all_planned_actions end end |
#add_finalize_step(action) ⇒ Object
243 244 245 246 247 248 |
# File 'lib/dynflow/execution_plan.rb', line 243 def add_finalize_step(action) add_step(Steps::FinalizeStep, action.class, action.id).tap do |step| step.progress_weight = action.finalize_progress_weight finalize_flow << Flows::Atom.new(step.id) end end |
#add_plan_step(action_class, planned_by = nil) ⇒ Object
229 230 231 232 233 |
# File 'lib/dynflow/execution_plan.rb', line 229 def add_plan_step(action_class, planned_by = nil) add_step(Steps::PlanStep, action_class, generate_action_id, planned_by && planned_by.plan_step_id).tap do |step| step.initialize_action end end |
#add_run_step(action) ⇒ Object
235 236 237 238 239 240 241 |
# File 'lib/dynflow/execution_plan.rb', line 235 def add_run_step(action) add_step(Steps::RunStep, action.class, action.id).tap do |step| step.progress_weight = action.run_progress_weight @dependency_graph.add_dependencies(step, action) current_run_flow.add_and_resolve(@dependency_graph, Flows::Atom.new(step.id)) end end |
#compute_execution_time ⇒ Object
286 287 288 289 290 |
# File 'lib/dynflow/execution_plan.rb', line 286 def compute_execution_time self.steps.values.reduce(0) do |execution_time, step| execution_time + (step.execution_time || 0) end end |
#current_run_flow ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
205 206 207 |
# File 'lib/dynflow/execution_plan.rb', line 205 def current_run_flow @run_flow_stack.last end |
#entry_action ⇒ Object
304 305 306 |
# File 'lib/dynflow/execution_plan.rb', line 304 def entry_action @entry_action ||= root_plan_step.action(self) end |
#error? ⇒ Boolean
96 97 98 |
# File 'lib/dynflow/execution_plan.rb', line 96 def error? result == :error end |
#errors ⇒ Object
100 101 102 |
# File 'lib/dynflow/execution_plan.rb', line 100 def errors steps.values.map(&:error).compact end |
#failed_steps ⇒ Object
118 119 120 |
# File 'lib/dynflow/execution_plan.rb', line 118 def failed_steps steps_in_state(:error) end |
#generate_action_id ⇒ Object
134 135 136 137 |
# File 'lib/dynflow/execution_plan.rb', line 134 def generate_action_id @last_action_id ||= 0 @last_action_id += 1 end |
#generate_step_id ⇒ Object
139 140 141 142 |
# File 'lib/dynflow/execution_plan.rb', line 139 def generate_step_id @last_step_id ||= 0 @last_step_id += 1 end |
#logger ⇒ Object
62 63 64 |
# File 'lib/dynflow/execution_plan.rb', line 62 def logger @world.logger end |
#plan(*args) ⇒ Object
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
# File 'lib/dynflow/execution_plan.rb', line 150 def plan(*args) update_state(:planning) world.transaction_adapter.transaction do world.middleware.execute(:plan_phase, root_plan_step.action_class) do with_planning_scope do root_plan_step.execute(self, nil, false, *args) if @dependency_graph.unresolved? raise "Some dependencies were not resolved: #{@dependency_graph.inspect}" end end end if @run_flow.size == 1 @run_flow = @run_flow.sub_flows.first end world.transaction_adapter.rollback if error? end steps.values.each(&:save) update_state(error? ? :stopped : :planned) end |
#prepare(action_class) ⇒ Object
144 145 146 147 148 |
# File 'lib/dynflow/execution_plan.rb', line 144 def prepare(action_class) save @root_plan_step = add_plan_step(action_class) @root_plan_step.save end |
#progress ⇒ 0..1
info
294 295 296 297 298 299 300 301 302 |
# File 'lib/dynflow/execution_plan.rb', line 294 def progress flow_step_ids = run_flow.all_step_ids + finalize_flow.all_step_ids plan_done, plan_total = flow_step_ids.reduce([0.0, 0]) do |(done, total), step_id| step = self.steps[step_id] [done + (step.progress_done * step.progress_weight), total + step.progress_weight] end plan_total > 0 ? (plan_done / plan_total) : 1 end |
#rescue_from_error ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/dynflow/execution_plan.rb', line 126 def rescue_from_error if rescue_plan_id = self.rescue_plan_id @world.execute(rescue_plan_id) else raise Errors::RescueError, 'Unable to rescue from the error' end end |
#rescue_plan_id ⇒ Object
108 109 110 111 112 113 114 115 116 |
# File 'lib/dynflow/execution_plan.rb', line 108 def rescue_plan_id case rescue_strategy when Action::Rescue::Pause nil when Action::Rescue::Skip failed_steps.each { |step| self.skip(step) } self.id end end |
#rescue_strategy ⇒ Object
104 105 106 |
# File 'lib/dynflow/execution_plan.rb', line 104 def rescue_strategy Type! entry_action.rescue_strategy, Action::Rescue::Strategy end |
#result ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/dynflow/execution_plan.rb', line 83 def result all_steps = steps.values if all_steps.any? { |step| step.state == :error } return :error elsif all_steps.any? { |step| [:skipping, :skipped].include?(step.state) } return :warning elsif all_steps.all? { |step| step.state == :success } return :success else return :pending end end |
#save ⇒ Object
265 266 267 |
# File 'lib/dynflow/execution_plan.rb', line 265 def save persistence.save_execution_plan(self) end |
#skip(step) ⇒ Object
173 174 175 176 177 |
# File 'lib/dynflow/execution_plan.rb', line 173 def skip(step) steps_to_skip = steps_to_skip(step).each(&:mark_to_skip) self.save return steps_to_skip end |
#steps_in_state(*states) ⇒ Object
122 123 124 |
# File 'lib/dynflow/execution_plan.rb', line 122 def steps_in_state(*states) self.steps.values.find_all {|step| states.include?(step.state) } end |
#steps_to_skip(step) ⇒ Array<Steps::Abstract>
All the steps that need to get skipped when wanting to skip the step includes the step itself, all steps dependent on it (even transitively) FIND maybe move to persistence to let adapter to do it effectively?
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/dynflow/execution_plan.rb', line 183 def steps_to_skip(step) dependent_steps = steps.values.find_all do |s| next if s.is_a? Steps::PlanStep action = persistence.load_action(s) action.required_step_ids.include?(step.id) end steps_to_skip = dependent_steps.map do |dependent_step| steps_to_skip(dependent_step) end.flatten steps_to_skip << step if step.is_a? Steps::RunStep finalize_step_id = persistence.load_action(step).finalize_step_id steps_to_skip << steps[finalize_step_id] if finalize_step_id end return steps_to_skip.uniq end |
#switch_flow(new_flow, &block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Switches the flow type (Sequence, Concurrence) to be used within the block.
221 222 223 224 225 226 227 |
# File 'lib/dynflow/execution_plan.rb', line 221 def switch_flow(new_flow, &block) @run_flow_stack << new_flow return block.call ensure @run_flow_stack.pop current_run_flow.add_and_resolve(@dependency_graph, new_flow) if current_run_flow end |
#to_hash ⇒ Object
250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/dynflow/execution_plan.rb', line 250 def to_hash recursive_to_hash id: self.id, class: self.class.to_s, state: self.state, result: result, root_plan_step_id: root_plan_step && root_plan_step.id, run_flow: run_flow, finalize_flow: finalize_flow, step_ids: steps.map { |id, _| id }, started_at: time_to_str(started_at), ended_at: time_to_str(ended_at), execution_time: execution_time, real_time: real_time end |
#update_state(state) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/dynflow/execution_plan.rb', line 66 def update_state(state) original = self.state case self.state = state when :planning @started_at = Time.now when :stopped @ended_at = Time.now @real_time = @ended_at - @started_at @execution_time = compute_execution_time else # ignore end logger.debug format('%13s %s %9s >> %9s', 'ExecutionPlan', id, original, state) self.save end |
#with_planning_scope(&block) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
210 211 212 213 214 215 216 217 |
# File 'lib/dynflow/execution_plan.rb', line 210 def with_planning_scope(&block) @run_flow_stack = [] @dependency_graph = DependencyGraph.new switch_flow(run_flow, &block) ensure @run_flow_stack = nil @dependency_graph = nil end |