Class: Dynflow::World
- Inherits:
-
Object
- Object
- Dynflow::World
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/world.rb
Direct Known Subclasses
Defined Under Namespace
Modules: TriggerResult, Triggered
Instance Attribute Summary collapse
-
#action_classes ⇒ Object
readonly
Returns the value of attribute action_classes.
-
#auto_rescue ⇒ Object
readonly
Returns the value of attribute auto_rescue.
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
-
#logger_adapter ⇒ Object
readonly
Returns the value of attribute logger_adapter.
-
#middleware ⇒ Object
readonly
Returns the value of attribute middleware.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#persistence ⇒ Object
readonly
Returns the value of attribute persistence.
-
#subscription_index ⇒ Object
readonly
Returns the value of attribute subscription_index.
-
#transaction_adapter ⇒ Object
readonly
Returns the value of attribute transaction_adapter.
Instance Method Summary collapse
- #action_logger ⇒ Object
- #clock ⇒ Object
-
#consistency_check ⇒ Object
Detects execution plans that are marked as running but no executor handles them (probably result of non-standard executor termination).
- #default_options ⇒ Object
- #event(execution_plan_id, step_id, event, future = Future.new) ⇒ Object
-
#execute(execution_plan_id, finished = Future.new) ⇒ Future
raises when ExecutionPlan is not accepted for execution.
-
#execute_planned_execution_plans ⇒ Object
should be called after World is initialized, SimpleWorld does it automatically.
-
#initialize(options_hash = {}) ⇒ World
constructor
A new instance of World.
- #logger ⇒ Object
- #plan(action_class, *args) ⇒ Object
-
#reload! ⇒ Object
reload actions classes, intended only for devel.
- #subscribed_actions(action_class) ⇒ Object
- #terminate(future = Future.new) ⇒ Object
-
#trigger(action_class, *args) ⇒ TriggerResult
blocks until action_class is planned.
Constructor Details
#initialize(options_hash = {}) ⇒ World
Returns a new instance of World.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/dynflow/world.rb', line 8 def initialize( = {}) @options = .merge @logger_adapter = Type! option_val(:logger_adapter), LoggerAdapters::Abstract @transaction_adapter = Type! option_val(:transaction_adapter), TransactionAdapters::Abstract persistence_adapter = Type! option_val(:persistence_adapter), PersistenceAdapters::Abstract @persistence = Persistence.new(self, persistence_adapter) @executor = Type! option_val(:executor), Executors::Abstract @action_classes = option_val(:action_classes) @auto_rescue = option_val(:auto_rescue) @exit_on_terminate = option_val(:exit_on_terminate) @middleware = Middleware::World.new calculate_subscription_index executor.initialized.wait @termination_barrier = Mutex.new @clock_barrier = Mutex.new transaction_adapter.check self end |
Instance Attribute Details
#action_classes ⇒ Object (readonly)
Returns the value of attribute action_classes.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def action_classes @action_classes end |
#auto_rescue ⇒ Object (readonly)
Returns the value of attribute auto_rescue.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def auto_rescue @auto_rescue end |
#executor ⇒ Object (readonly)
Returns the value of attribute executor.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def executor @executor end |
#logger_adapter ⇒ Object (readonly)
Returns the value of attribute logger_adapter.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def logger_adapter @logger_adapter end |
#middleware ⇒ Object (readonly)
Returns the value of attribute middleware.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def middleware @middleware end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def @options end |
#persistence ⇒ Object (readonly)
Returns the value of attribute persistence.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def persistence @persistence end |
#subscription_index ⇒ Object (readonly)
Returns the value of attribute subscription_index.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def subscription_index @subscription_index end |
#transaction_adapter ⇒ Object (readonly)
Returns the value of attribute transaction_adapter.
5 6 7 |
# File 'lib/dynflow/world.rb', line 5 def transaction_adapter @transaction_adapter end |
Instance Method Details
#action_logger ⇒ Object
45 46 47 |
# File 'lib/dynflow/world.rb', line 45 def action_logger logger_adapter.action_logger end |
#clock ⇒ Object
37 38 39 |
# File 'lib/dynflow/world.rb', line 37 def clock @clock_barrier.synchronize { @clock ||= Clock.new(logger) } end |
#consistency_check ⇒ Object
Detects execution plans that are marked as running but no executor handles them (probably result of non-standard executor termination)
The current implementation expects no execution_plan being actually run by the executor.
TODO: persist the running executors in the system, so that we can detect the orphaned execution plans. The register should be managable by the console, so that the administrator can unregister dead executors when needed. After the executor is unregistered, the consistency check should be performed to fix the orphaned plans as well.
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/dynflow/world.rb', line 152 def consistency_check abnormal_execution_plans = self.persistence.find_execution_plans filters: { 'state' => %w(planning running) } if abnormal_execution_plans.empty? logger.info 'Clean start.' else format_str = '%36s %10s %10s' = ['Abnormal execution plans, process was probably killed.', 'Following ExecutionPlans will be set to paused, ', 'it should be fixed manually by administrator.', (format format_str, 'ExecutionPlan', 'state', 'result'), *(abnormal_execution_plans.map do |ep| format format_str, ep.id, ep.state, ep.result end)] logger.error .join("\n") abnormal_execution_plans.each do |ep| ep.update_state case ep.state when :planning :stopped when :running :paused else raise end ep.steps.values.each do |step| if [:suspended, :running].include?(step.state) step.error = ExecutionPlan::Steps::Error.new("Abnormal termination (previous state: #{step.state})") step.state = :error step.save end end end end end |
#default_options ⇒ Object
28 29 30 31 32 33 34 35 |
# File 'lib/dynflow/world.rb', line 28 def @default_options ||= { action_classes: Action.all_children, logger_adapter: LoggerAdapters::Simple.new, executor: -> world { Executors::Parallel.new(world, [:pool_size]) }, exit_on_terminate: true, auto_rescue: true } end |
#event(execution_plan_id, step_id, event, future = Future.new) ⇒ Object
109 110 111 |
# File 'lib/dynflow/world.rb', line 109 def event(execution_plan_id, step_id, event, future = Future.new) executor.event execution_plan_id, step_id, event, future end |
#execute(execution_plan_id, finished = Future.new) ⇒ Future
raises when ExecutionPlan is not accepted for execution
122 123 124 |
# File 'lib/dynflow/world.rb', line 122 def execute(execution_plan_id, finished = Future.new) executor.execute execution_plan_id, finished end |
#execute_planned_execution_plans ⇒ Object
should be called after World is initialized, SimpleWorld does it automatically
190 191 192 193 194 |
# File 'lib/dynflow/world.rb', line 190 def execute_planned_execution_plans planned_execution_plans = self.persistence.find_execution_plans filters: { 'state' => %w(planned) } planned_execution_plans.each { |ep| execute ep.id } end |
#logger ⇒ Object
41 42 43 |
# File 'lib/dynflow/world.rb', line 41 def logger logger_adapter.dynflow_logger end |
#plan(action_class, *args) ⇒ Object
113 114 115 116 117 118 |
# File 'lib/dynflow/world.rb', line 113 def plan(action_class, *args) ExecutionPlan.new(self).tap do |execution_plan| execution_plan.prepare(action_class) execution_plan.plan(*args) end end |
#reload! ⇒ Object
reload actions classes, intended only for devel
54 55 56 57 58 |
# File 'lib/dynflow/world.rb', line 54 def reload! @action_classes.map! { |klass| klass.to_s.constantize } middleware.clear_cache! calculate_subscription_index end |
#subscribed_actions(action_class) ⇒ Object
49 50 51 |
# File 'lib/dynflow/world.rb', line 49 def subscribed_actions(action_class) @subscription_index.has_key?(action_class) ? @subscription_index[action_class] : [] end |
#terminate(future = Future.new) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/dynflow/world.rb', line 126 def terminate(future = Future.new) @termination_barrier.synchronize do if @executor_terminated.nil? @executor_terminated = Future.new @clock_terminated = Future.new executor.terminate(@executor_terminated). do_then { clock.ask(MicroActor::Terminate, @clock_terminated) } if @exit_on_terminate future.do_then { Kernel.exit } end end end Future.join([@executor_terminated, @clock_terminated], future) end |
#trigger(action_class, *args) ⇒ TriggerResult
blocks until action_class is planned
94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/dynflow/world.rb', line 94 def trigger(action_class, *args) execution_plan = plan(action_class, *args) planned = execution_plan.state == :planned if planned begin Triggered[execution_plan.id, execute(execution_plan.id)] rescue => exception ExecutionFailed[execution_plan.id, exception] end else PlaningFailed[execution_plan.id, execution_plan.errors.first.exception] end end |