Class: WorkflowRb::WorkflowExecutor
- Inherits:
-
Object
- Object
- WorkflowRb::WorkflowExecutor
- Defined in:
- lib/workflow_rb/services/workflow_executor.rb
Instance Method Summary collapse
- #execute(workflow) ⇒ Object
-
#initialize(registry, persistence, host, logger) ⇒ WorkflowExecutor
constructor
A new instance of WorkflowExecutor.
Constructor Details
#initialize(registry, persistence, host, logger) ⇒ WorkflowExecutor
Returns a new instance of WorkflowExecutor.
14 15 16 17 18 19 |
# File 'lib/workflow_rb/services/workflow_executor.rb', line 14 def initialize(registry, persistence, host, logger) @registry = registry @persistence = persistence @host = host @logger = logger end |
Instance Method Details
#execute(workflow) ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/workflow_rb/services/workflow_executor.rb', line 21 def execute(workflow) @logger.debug("Executing workflow #{workflow.id}") exe_pointers = workflow.execution_pointers.select { |x| x.active } definition = @registry.get_definition(workflow.definition_id, workflow.version) if not definition raise Exception "Workflow definition #{workflow.definition_id}" end exe_pointers.each do |pointer| step = definition.steps.select { |x| x.id == pointer.step_id }.first if not step raise Exception "Step #{pointer.step_id} not found in definition" end if (step.kind_of?(SubscriptionStep)) and (not pointer.event_published) pointer.event_name = step.event_name pointer.event_key = step.event_key pointer.active = false @host.subscribe_event(workflow.id, step.id, step.event_name, step.event_key) next end if not pointer.start_time pointer.start_time = Time.new end execution_context = StepExecutionContext.new execution_context.persistence_data = pointer.persistence_data execution_context.workflow = workflow execution_context.step = step if step.body.kind_of?(Proc) body_class = Class.new(StepBody) do def initialize(body) @body = body end def run(context) @body.call(context) end end body_obj = body_class.new(step.body) else if step.body <= StepBody body_obj = step.body.new end end if not body_obj raise "Cannot construct step body #{step.body}" end step.inputs.each do |input| io_value = input.value.call(workflow.data) body_obj.send("#{input.property}=", io_value) end if (body_obj.kind_of?(SubscriptionStepBody)) and (pointer.event_published) body_obj.event_data = pointer.event_data end result = body_obj.run(execution_context) if (result.proceed) step.outputs.each do |output| io_value = output.value.call(body_obj) workflow.data.send("#{output.property}=", io_value) end pointer.active = false pointer.end_time = Time.new fork_counter = 1 pointer.path_terminator = true step.outcomes.select {|x| x.value == result.outcome_value}.each do |outcome| new_pointer = ExecutionPointer.new new_pointer.active = true new_pointer.step_id = outcome.next_step new_pointer.concurrent_fork = fork_counter * pointer.concurrent_fork workflow.execution_pointers << new_pointer pointer.path_terminator = false fork_counter += 1 end else pointer.persistence_data = result.persistence_data pointer.sleep_until = result.sleep_until end end determine_next_execution(workflow) @persistence.persist_workflow(workflow) end |