Class: Dynflow::Executors::Parallel::ExecutionPlanManager
- Inherits:
-
Object
- Object
- Dynflow::Executors::Parallel::ExecutionPlanManager
- Includes:
- Algebrick::Matching, Algebrick::TypeCheck
- Defined in:
- lib/dynflow/executors/parallel/execution_plan_manager.rb
Instance Attribute Summary collapse
-
#execution_plan ⇒ Object
readonly
Returns the value of attribute execution_plan.
-
#future ⇒ Object
readonly
Returns the value of attribute future.
Instance Method Summary collapse
- #done? ⇒ Boolean
- #event(event) ⇒ Object
-
#initialize(world, execution_plan, future) ⇒ ExecutionPlanManager
constructor
A new instance of ExecutionPlanManager.
- #prepare_next_step(step) ⇒ Object
- #start ⇒ Object
- #terminate ⇒ Object
-
#what_is_next(work) ⇒ Array<Work>
Of Work items to continue with.
Constructor Details
#initialize(world, execution_plan, future) ⇒ ExecutionPlanManager
Returns a new instance of ExecutionPlanManager.
10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 10 def initialize(world, execution_plan, future) @world = Type! world, World @execution_plan = Type! execution_plan, ExecutionPlan @future = Type! future, Future @running_steps_manager = RunningStepsManager.new(world) unless [:planned, :paused].include? execution_plan.state raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}" end execution_plan.update_state(:running) end |
Instance Attribute Details
#execution_plan ⇒ Object (readonly)
Returns the value of attribute execution_plan.
8 9 10 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 8 def execution_plan @execution_plan end |
#future ⇒ Object (readonly)
Returns the value of attribute future.
8 9 10 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 8 def future @future end |
Instance Method Details
#done? ⇒ Boolean
70 71 72 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 70 def done? (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?) end |
#event(event) ⇒ Object
64 65 66 67 68 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 64 def event(event) Type! event, Parallel::Event raise unless event.execution_plan_id == @execution_plan.id @running_steps_manager.event(event) end |
#prepare_next_step(step) ⇒ Object
27 28 29 30 31 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 27 def prepare_next_step(step) Work::Step[step, execution_plan.id].tap do |work| @running_steps_manager.add(step, work) end end |
#start ⇒ Object
22 23 24 25 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 22 def start raise "The future was already set" if @future.ready? start_run or start_finalize or finish end |
#terminate ⇒ Object
74 75 76 77 78 79 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 74 def terminate @running_steps_manager.terminate unless @execution_plan.state == :paused @execution_plan.update_state(:paused) end end |
#what_is_next(work) ⇒ Array<Work>
Returns of Work items to continue with.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 34 def what_is_next(work) Type! work, Work compute_next_from_step =-> step do raise unless @run_manager raise if @run_manager.done? next_steps = @run_manager.what_is_next(step) if @run_manager.done? start_finalize or finish else next_steps.map { |s| prepare_next_step(s) } end end match(work, (on Work::Step.(step: ~any) | Work::Event.(step: ~any) do |step| execution_plan.steps[step.id] = step suspended, work = @running_steps_manager.done(step) unless suspended work = compute_next_from_step.call step end work end), (on Work::Finalize do raise unless @finalize_manager finish end)) end |