Class: Dynflow::Executors::Parallel::RunningStepsManager
- Inherits:
-
Object
- Object
- Dynflow::Executors::Parallel::RunningStepsManager
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/executors/parallel/running_steps_manager.rb
Overview
Handles the events generated while running actions, makes sure the events are sent to the action only when in suspended state
Instance Method Summary collapse
- #add(step, work) ⇒ Object
- #done(step) ⇒ Object
- #event(event) ⇒ Object
-
#initialize(world) ⇒ RunningStepsManager
constructor
A new instance of RunningStepsManager.
- #terminate ⇒ Object
Constructor Details
#initialize(world) ⇒ RunningStepsManager
Returns a new instance of RunningStepsManager.
10 11 12 13 14 |
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 10 def initialize(world) @world = Type! world, World @running_steps = {} @events = WorkQueue.new(Integer, Work) end |
Instance Method Details
#add(step, work) ⇒ Object
25 26 27 28 29 30 31 |
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 25 def add(step, work) Type! step, ExecutionPlan::Steps::RunStep @running_steps[step.id] = step # we make sure not to run any event when the step is still being executed @events.push(step.id, work) self end |
#done(step) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 34 def done(step) Type! step, ExecutionPlan::Steps::RunStep @events.shift(step.id).tap do |work| work.event.result.resolve true if Work::Event === work end if step.state == :suspended return true, @events.first(step.id) else while (event = @events.shift(step.id)) = "step #{step.execution_plan_id}:#{step.id} dropping event #{event.event}" @world.logger.warn event.event.result.fail UnprocessableEvent.new(). tap { |e| e.set_backtrace(caller) } end raise 'assert' unless @events.empty?(step.id) @running_steps.delete(step.id) return false, nil end end |
#event(event) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 56 def event(event) Type! event, Parallel::Event step = @running_steps[event.step_id] unless step event.result.fail UnprocessableEvent.new( 'step is not suspended, it cannot process events') return nil end can_run_event = @events.empty?(step.id) work = Work::Event[step, event.execution_plan_id, event] @events.push(step.id, work) work if can_run_event end |
#terminate ⇒ Object
16 17 18 19 20 21 22 23 |
# File 'lib/dynflow/executors/parallel/running_steps_manager.rb', line 16 def terminate pending_work = @events.clear.values.flatten pending_work.each do |w| if Work::Event === w w.event.result.fail UnprocessableEvent.new("dropping due to termination") end end end |