Class: Dynflow::Executors::Parallel::RunningStepsManager

Inherits:
Object
  • Object
show all
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/executors/parallel/running_steps_manager.rb

Overview

Handles the events generated while running actions, makes sure the events are sent to the action only when in suspended state

Instance Method Summary collapse

Constructor Details

#initialize(world) ⇒ RunningStepsManager

Returns a new instance of RunningStepsManager.



10
11
12
13
14
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 10

def initialize(world)
  @world         = Type! world, World
  @running_steps = {}
  @events        = WorkQueue.new(Integer, Work)
end

Instance Method Details

#add(step, work) ⇒ Object



25
26
27
28
29
30
31
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 25

def add(step, work)
  Type! step, ExecutionPlan::Steps::RunStep
  @running_steps[step.id] = step
  # we make sure not to run any event when the step is still being executed
  @events.push(step.id, work)
  self
end

#done(step) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 34

def done(step)
  Type! step, ExecutionPlan::Steps::RunStep
  @events.shift(step.id).tap do |work|
    work.event.result.resolve true if Work::Event === work
  end

  if step.state == :suspended
    return true, @events.first(step.id)
  else
    while (event = @events.shift(step.id))
      message = "step #{step.execution_plan_id}:#{step.id} dropping event #{event.event}"
      @world.logger.warn message
      event.event.result.fail UnprocessableEvent.new(message).
                                  tap { |e| e.set_backtrace(caller) }
    end
    raise 'assert' unless @events.empty?(step.id)
    @running_steps.delete(step.id)
    return false, nil
  end
end

#event(event) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 56

def event(event)
  Type! event, Parallel::Event

  step = @running_steps[event.step_id]
  unless step
    event.result.fail UnprocessableEvent.new(
                          'step is not suspended, it cannot process events')
    return nil
  end

  can_run_event = @events.empty?(step.id)
  work          = Work::Event[step, event.execution_plan_id, event]
  @events.push(step.id, work)
  work if can_run_event
end

#terminateObject



16
17
18
19
20
21
22
23
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 16

def terminate
  pending_work = @events.clear.values.flatten
  pending_work.each do |w|
    if Work::Event === w
      w.event.result.fail UnprocessableEvent.new("dropping due to termination")
    end
  end
end