Class: Dynflow::Executors::Parallel::ExecutionPlanManager

Inherits:
Object
  • Object
show all
Includes:
Algebrick::Matching, Algebrick::TypeCheck
Defined in:
lib/dynflow/executors/parallel/execution_plan_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(world, execution_plan, future) ⇒ ExecutionPlanManager

Returns a new instance of ExecutionPlanManager.



10
11
12
13
14
15
16
17
18
19
20
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 10

def initialize(world, execution_plan, future)
  @world                 = Type! world, World
  @execution_plan        = Type! execution_plan, ExecutionPlan
  @future                = Type! future, Future
  @running_steps_manager = RunningStepsManager.new(world)

  unless [:planned, :paused].include? execution_plan.state
    raise "execution_plan is not in pending or paused state, it's #{execution_plan.state}"
  end
  execution_plan.update_state(:running)
end

Instance Attribute Details

#execution_planObject (readonly)

Returns the value of attribute execution_plan.



8
9
10
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 8

def execution_plan
  @execution_plan
end

#futureObject (readonly)

Returns the value of attribute future.



8
9
10
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 8

def future
  @future
end

Instance Method Details

#done?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 70

def done?
  (!@run_manager || @run_manager.done?) && (!@finalize_manager || @finalize_manager.done?)
end

#event(event) ⇒ Object



64
65
66
67
68
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 64

def event(event)
  Type! event, Parallel::Event
  raise unless event.execution_plan_id == @execution_plan.id
  @running_steps_manager.event(event)
end

#prepare_next_step(step) ⇒ Object



27
28
29
30
31
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 27

def prepare_next_step(step)
  Work::Step[step, execution_plan.id].tap do |work|
    @running_steps_manager.add(step, work)
  end
end

#startObject



22
23
24
25
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 22

def start
  raise "The future was already set" if @future.ready?
  start_run or start_finalize or finish
end

#terminateObject



74
75
76
77
78
79
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 74

def terminate
  @running_steps_manager.terminate
  unless @execution_plan.state == :paused
    @execution_plan.update_state(:paused)
  end
end

#what_is_next(work) ⇒ Array<Work>

Returns of Work items to continue with.

Returns:

  • (Array<Work>)

    of Work items to continue with



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/dynflow/executors/parallel/execution_plan_manager.rb', line 34

def what_is_next(work)
  Type! work, Work

  compute_next_from_step =-> step do
    raise unless @run_manager
    raise if @run_manager.done?

    next_steps = @run_manager.what_is_next(step)
    if @run_manager.done?
      start_finalize or finish
    else
      next_steps.map { |s| prepare_next_step(s) }
    end
  end

  match(work,
        (on Work::Step.(step: ~any) | Work::Event.(step: ~any) do |step|
           execution_plan.steps[step.id] = step
           suspended, work = @running_steps_manager.done(step)
           unless suspended
             work = compute_next_from_step.call step
           end
           work
         end),
        (on Work::Finalize do
           raise unless @finalize_manager
           finish
         end))
end