Class: Roby::Control
- Includes:
- DRbUndumped, Log::ControlHooks, Singleton
- Defined in:
- lib/roby/control.rb,
lib/roby.rb,
lib/roby/distributed/peer.rb
Overview
This singleton class is the central object: it handles the event loop, event propagation and exception propagation.
Constant Summary collapse
- SLEEP_MIN_TIME =
Do not sleep or call Thread#pass if there is less that this much time left in the cycle
0.01
- THREAD_PRIORITY =
The priority of the control thread
10
Constants included from Log::ControlHooks
Class Attribute Summary collapse
-
.at_cycle_end_handlers ⇒ Object
readonly
A set of blocks that are called at each cycle end.
-
.event_processing ⇒ Object
readonly
List of procs which are called at each event cycle.
-
.mutex ⇒ Object
readonly
Returns the value of attribute mutex.
-
.process_every ⇒ Object
readonly
A set of blocks which are called every cycle.
-
.process_once ⇒ Object
readonly
A list of blocks to be called at the beginning of the next event loop.
-
.structure_checks ⇒ Object
readonly
List of procs to be called for task structure checking.
-
.waiting_threads ⇒ Object
readonly
A list of threads which are currently waitiing for the control thread (see for instance Roby.execute).
Instance Attribute Summary collapse
-
#abort_on_application_exception ⇒ Object
If true, abort if an application exception is found.
-
#abort_on_exception ⇒ Object
If true, abort if an unhandled exception is found.
-
#abort_on_framework_exception ⇒ Object
If true, abort if a framework exception is found.
-
#cycle_index ⇒ Object
readonly
The number of this cycle since the beginning.
-
#cycle_length ⇒ Object
readonly
The cycle length in seconds.
-
#cycle_start ⇒ Object
readonly
The starting point of this cycle.
-
#last_stop_count ⇒ Object
readonly
Returns the value of attribute last_stop_count.
-
#plan ⇒ Object
readonly
The plan being executed.
-
#planners ⇒ Object
readonly
A set of planners declared in this application.
-
#remaining_cycle_time ⇒ Object
readonly
Returns the value of attribute remaining_cycle_time.
-
#thread ⇒ Object
Returns the value of attribute thread.
Class Method Summary collapse
-
.at_cycle_end(&block) ⇒ Object
Call
blockat the end of the execution cycle. -
.call_every ⇒ Object
:nodoc:.
-
.call_once ⇒ Object
Calls all pending procs in
process_once. -
.each_cycle(&block) ⇒ Object
Call
blockat each cycle. -
.every(duration, &block) ⇒ Object
Call
blockeverydurationseconds. -
.fatal_exception(error, tasks) ⇒ Object
Hook called when a set of tasks is being killed because of an exception.
- .finalizers ⇒ Object
-
.handled_exception(e, task) ⇒ Object
Hook called when an exception
ehas been handled bytask. -
.once(&block) ⇒ Object
Call block once before event processing.
- .remove_periodic_handler(id) ⇒ Object
-
.synchronize ⇒ Object
Implements a recursive behaviour on Control.mutex.
- .taken_mutex? ⇒ Boolean
Instance Method Summary collapse
- #add_expected_duration(stats, name, duration) ⇒ Object
- #add_timepoint(stats, name) ⇒ Object
- #clear ⇒ Object
-
#cycle_end(stats) ⇒ Object
Called at each cycle end.
- #event_loop ⇒ Object
-
#forced_exit? ⇒ Boolean
True if the control thread is currently quitting.
-
#initialize ⇒ Control
constructor
A new instance of Control.
-
#join ⇒ Object
If the event thread has been started in its own thread, wait for it to terminate.
-
#process_events(stats = {}) ⇒ Object
Process the pending events.
-
#quit ⇒ Object
Make control quit.
-
#quitting? ⇒ Boolean
True if the control thread is currently quitting.
-
#reraise(exceptions) ⇒ Object
Abort the control loop because of
exceptions. -
#run(options = {}) ⇒ Object
Main event loop.
- #running? ⇒ Boolean
-
#structure_checking ⇒ Object
Perform the structure checking step by calling the procs registered in Control::structure_checks.
-
#wait_one_cycle ⇒ Object
Blocks until at least once execution cycle has been done.
Constructor Details
#initialize ⇒ Control
254 255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/roby/control.rb', line 254 def initialize super @quit = 0 @thread = nil @cycle_index = 0 @cycle_start = Time.now @cycle_length = 0 @planners = [] @last_stop_count = 0 @plan = Plan.new Roby.instance_variable_set(:@plan, @plan) plan.extend Roby::Propagation::ExecutablePlanChanged end |
Class Attribute Details
.at_cycle_end_handlers ⇒ Object (readonly)
A set of blocks that are called at each cycle end
406 407 408 |
# File 'lib/roby/control.rb', line 406 def at_cycle_end_handlers @at_cycle_end_handlers end |
.event_processing ⇒ Object (readonly)
List of procs which are called at each event cycle
239 240 241 |
# File 'lib/roby/control.rb', line 239 def event_processing @event_processing end |
.mutex ⇒ Object (readonly)
Returns the value of attribute mutex.
200 201 202 |
# File 'lib/roby/control.rb', line 200 def mutex @mutex end |
.process_every ⇒ Object (readonly)
A set of blocks which are called every cycle
414 415 416 |
# File 'lib/roby/control.rb', line 414 def process_every @process_every end |
.process_once ⇒ Object (readonly)
A list of blocks to be called at the beginning of the next event loop
386 387 388 |
# File 'lib/roby/control.rb', line 386 def process_once @process_once end |
.structure_checks ⇒ Object (readonly)
List of procs to be called for task structure checking
The blocks return a set of exceptions or nil. The exception must respond to #task or #generator to know from which task the problem comes.
246 247 248 |
# File 'lib/roby/control.rb', line 246 def structure_checks @structure_checks end |
.waiting_threads ⇒ Object (readonly)
A list of threads which are currently waitiing for the control thread (see for instance Roby.execute)
Control#run will raise ControlQuitError on this threads if they are still waiting while the control is quitting
384 385 386 |
# File 'lib/roby/control.rb', line 384 def waiting_threads @waiting_threads end |
Instance Attribute Details
#abort_on_application_exception ⇒ Object
If true, abort if an application exception is found
231 232 233 |
# File 'lib/roby/control.rb', line 231 def abort_on_application_exception @abort_on_application_exception end |
#abort_on_exception ⇒ Object
If true, abort if an unhandled exception is found
229 230 231 |
# File 'lib/roby/control.rb', line 229 def abort_on_exception @abort_on_exception end |
#abort_on_framework_exception ⇒ Object
If true, abort if a framework exception is found
233 234 235 |
# File 'lib/roby/control.rb', line 233 def abort_on_framework_exception @abort_on_framework_exception end |
#cycle_index ⇒ Object (readonly)
The number of this cycle since the beginning
463 464 465 |
# File 'lib/roby/control.rb', line 463 def cycle_index @cycle_index end |
#cycle_length ⇒ Object (readonly)
The cycle length in seconds
457 458 459 |
# File 'lib/roby/control.rb', line 457 def cycle_length @cycle_length end |
#cycle_start ⇒ Object (readonly)
The starting point of this cycle
460 461 462 |
# File 'lib/roby/control.rb', line 460 def cycle_start @cycle_start end |
#last_stop_count ⇒ Object (readonly)
Returns the value of attribute last_stop_count.
517 518 519 |
# File 'lib/roby/control.rb', line 517 def last_stop_count @last_stop_count end |
#plan ⇒ Object (readonly)
The plan being executed
250 251 252 |
# File 'lib/roby/control.rb', line 250 def plan @plan end |
#planners ⇒ Object (readonly)
A set of planners declared in this application
252 253 254 |
# File 'lib/roby/control.rb', line 252 def planners @planners end |
#remaining_cycle_time ⇒ Object (readonly)
Returns the value of attribute remaining_cycle_time.
556 557 558 |
# File 'lib/roby/control.rb', line 556 def remaining_cycle_time @remaining_cycle_time end |
#thread ⇒ Object
Returns the value of attribute thread.
453 454 455 |
# File 'lib/roby/control.rb', line 453 def thread @thread end |
Class Method Details
.at_cycle_end(&block) ⇒ Object
Call block at the end of the execution cycle
409 410 411 |
# File 'lib/roby/control.rb', line 409 def at_cycle_end(&block) Control.at_cycle_end_handlers << block end |
.call_every ⇒ Object
:nodoc:
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 |
# File 'lib/roby/control.rb', line 432 def call_every # :nodoc: now = Roby.control.cycle_start length = Roby.control.cycle_length process_every.map! do |block, last_call, duration| begin # Check if the nearest timepoint is the beginning of # this cycle or of the next cycle if !last_call || (duration - (now - last_call)) < length / 2 block.call last_call = now end rescue Exception => e Propagation.add_framework_error(e, "#call_every, in #{block}") end [block, last_call, duration] end end |
.call_once ⇒ Object
Calls all pending procs in process_once
388 389 390 391 392 393 394 395 396 397 |
# File 'lib/roby/control.rb', line 388 def call_once # :nodoc: while !process_once.empty? p = process_once.pop begin p.call rescue Exception => e Propagation.add_framework_error(e, "call once in #{p}") end end end |
.each_cycle(&block) ⇒ Object
Call block at each cycle
403 |
# File 'lib/roby/control.rb', line 403 def each_cycle(&block); Control.event_processing << block end |
.every(duration, &block) ⇒ Object
Call block every duration seconds. Note that duration is round up to the cycle size (time between calls is *at least* duration)
418 419 420 421 422 423 424 |
# File 'lib/roby/control.rb', line 418 def every(duration, &block) Control.once do block.call process_every << [block, Roby.control.cycle_start, duration] end block.object_id end |
.fatal_exception(error, tasks) ⇒ Object
Hook called when a set of tasks is being killed because of an exception
721 722 723 724 725 726 |
# File 'lib/roby/control.rb', line 721 def self.fatal_exception(error, tasks) super if defined? super Roby.format_exception(error.exception).each do |line| Roby.warn line end end |
.finalizers ⇒ Object
676 |
# File 'lib/roby/control.rb', line 676 def self.finalizers; @finalizers end |
.handled_exception(e, task) ⇒ Object
Hook called when an exception e has been handled by task
728 |
# File 'lib/roby/control.rb', line 728 def self.handled_exception(e, task); super if defined? super end |
.once(&block) ⇒ Object
Call block once before event processing
401 |
# File 'lib/roby/control.rb', line 401 def once(&block); process_once.push block end |
.remove_periodic_handler(id) ⇒ Object
426 427 428 429 430 |
# File 'lib/roby/control.rb', line 426 def remove_periodic_handler(id) Roby.execute do process_every.delete_if { |spec| spec[0].object_id == id } end end |
.synchronize ⇒ Object
Implements a recursive behaviour on Control.mutex
205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/roby/control.rb', line 205 def synchronize if Thread.current[:control_mutex_locked] yield else mutex.lock begin Thread.current[:control_mutex_locked] = true yield ensure Thread.current[:control_mutex_locked] = false mutex.unlock end end end |
.taken_mutex? ⇒ Boolean
202 |
# File 'lib/roby/control.rb', line 202 def taken_mutex?; Thread.current[:control_mutex_locked] end |
Instance Method Details
#add_expected_duration(stats, name, duration) ⇒ Object
561 562 563 |
# File 'lib/roby/control.rb', line 561 def add_expected_duration(stats, name, duration) stats[name] = Time.now + duration - cycle_start end |
#add_timepoint(stats, name) ⇒ Object
557 558 559 560 |
# File 'lib/roby/control.rb', line 557 def add_timepoint(stats, name) stats[:end] = stats[name] = Time.now - cycle_start @remaining_cycle_time = cycle_length - stats[:end] end |
#clear ⇒ Object
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 |
# File 'lib/roby/control.rb', line 518 def clear Control.synchronize do plan.missions.dup.each { |t| plan.discard(t) } plan.keepalive.dup.each { |t| plan.auto(t) } plan.force_gc.merge( plan.known_tasks ) quaranteened_subplan = plan.useful_task_component(nil, ValueSet.new, plan.gc_quarantine.dup) remaining = plan.known_tasks - quaranteened_subplan if remaining.empty? # Have to call #garbage_collect one more to make # sure that unneeded events are removed as well plan.garbage_collect # Done cleaning the tasks, clear the remains plan.transactions.each do |trsc| trsc.discard_transaction if trsc.self_owned? end plan.clear return end if last_stop_count != remaining.size if last_stop_count == 0 Roby.info "control quitting. Waiting for #{remaining.size} tasks to finish (#{plan.size} tasks still in plan)" Roby.debug " " + remaining.to_a.join("\n ") else Roby.info "waiting for #{remaining.size} tasks to finish (#{plan.size} tasks still in plan)" Roby.debug " #{remaining.to_a.join("\n ")}" end if plan.gc_quarantine.size != 0 Roby.info "#{plan.gc_quarantine.size} tasks in quarantine" end @last_stop_count = remaining.size end remaining end end |
#cycle_end(stats) ⇒ Object
Called at each cycle end
686 687 688 689 690 691 692 693 694 695 696 |
# File 'lib/roby/control.rb', line 686 def cycle_end(stats) super if defined? super Control.at_cycle_end_handlers.each do |handler| begin handler.call rescue Exception => e Propagation.add_framework_error(e, "during cycle end handler #{handler}") end end end |
#event_loop ⇒ Object
565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 |
# File 'lib/roby/control.rb', line 565 def event_loop @last_stop_count = 0 @cycle_start = Time.now @cycle_index = 0 gc_enable_has_argument = begin GC.enable(true) true rescue; false end stats = Hash.new if ObjectSpace.respond_to?(:live_objects) stats[:live_objects] = ObjectSpace.live_objects end GC.start if gc_enable_has_argument already_disabled_gc = GC.disable end loop do begin if quitting? thread.priority = 0 begin return if forced_exit? || !clear rescue Exception => e Roby.warn "Control failed to clean up" Roby.format_exception(e).each do |line| Roby.warn line end return end end while Time.now > cycle_start + cycle_length @cycle_start += cycle_length @cycle_index += 1 end stats[:start] = [cycle_start.tv_sec, cycle_start.tv_usec] stats[:cycle_index] = cycle_index Control.synchronize { process_events(stats) } # Record the statistics about object allocation *before* running the Ruby # GC. It is also updated at if ObjectSpace.respond_to?(:live_objects) live_objects_before_gc = ObjectSpace.live_objects end # If the ruby interpreter we run on offers a true/false argument to # GC.enable, we disabled the GC and just run GC.enable(true) to make # it run immediately if needed. Then, we re-disable it just after. if gc_enable_has_argument && remaining_cycle_time > SLEEP_MIN_TIME GC.enable(true) GC.disable end add_timepoint(stats, :ruby_gc) if ObjectSpace.respond_to?(:live_objects) live_objects_after_gc = ObjectSpace.live_objects end # Sleep if there is enough time for it if remaining_cycle_time > SLEEP_MIN_TIME add_expected_duration(stats, :sleep, remaining_cycle_time) sleep(remaining_cycle_time) end add_timepoint(stats, :sleep) # Add some statistics and call cycle_end if defined? Roby::Log stats[:log_queue_size] = Roby::Log.logged_events.size end stats[:plan_task_count] = plan.known_tasks.size stats[:plan_event_count] = plan.free_events.size process_time = Process.times stats[:cpu_time] = (process_time.utime + process_time.stime) * 1000 if ObjectSpace.respond_to?(:live_objects) live_objects = ObjectSpace.live_objects stats[:object_allocation] = live_objects - stats[:live_objects] - (live_objects_after_gc - live_objects_before_gc) stats[:live_objects] = live_objects end cycle_end(stats) stats = Hash.new stats[:live_objects] = live_objects @cycle_start += cycle_length @cycle_index += 1 rescue Exception => e Roby.warn "Control quitting because of unhandled exception" Roby.format_exception(e).each do |line| Roby.warn line end quit end end ensure GC.enable if !already_disabled_gc if !plan.known_tasks.empty? Roby.warn "the following tasks are still present in the plan:" plan.known_tasks.each do |t| Roby.warn " #{t}" end end end |
#forced_exit? ⇒ Boolean
True if the control thread is currently quitting
681 |
# File 'lib/roby/control.rb', line 681 def forced_exit?; @quit > 1 end |
#join ⇒ Object
If the event thread has been started in its own thread, wait for it to terminate
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 |
# File 'lib/roby/control.rb', line 700 def join thread.join if thread rescue Interrupt Roby::Control.synchronize do return unless thread Roby.logger.level = Logger::INFO Roby.warn "received interruption request" quit if @quit > 2 thread.raise Interrupt, "interrupting control thread at user request" end end retry end |
#process_events(stats = {}) ⇒ Object
Process the pending events. The time at each event loop step is saved into stats.
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 |
# File 'lib/roby/control.rb', line 305 def process_events(stats = {}) Thread.current[:application_exceptions] = [] add_timepoint(stats, :real_start) # Gather new events and propagate them events_errors = Propagation.propagate_events(Control.event_processing) add_timepoint(stats, :events) # HACK: events_errors is sometime nil here. It shouldn't events_errors ||= [] # Generate exceptions from task structure structure_errors = structure_checking add_timepoint(stats, :structure_check) # Propagate the errors. Note that the plan repairs are taken into # account in Propagation.propagate_exceptions drectly. We keep # event and structure errors separate since in the first case there # is not two-stage handling (all errors that have not been handled # are fatal), and in the second case we call #structure_checking # again to get the remaining errors events_errors = Propagation.propagate_exceptions(events_errors) Propagation.propagate_exceptions(structure_errors) add_timepoint(stats, :exception_propagation) # Get the remaining problems in the plan structure, and act on it fatal_structure_errors = Propagation.remove_inhibited_exceptions(structure_checking) fatal_errors = fatal_structure_errors.to_a + events_errors kill_tasks = fatal_errors.inject(ValueSet.new) do |kill_tasks, (error, tasks)| tasks ||= [*error.task] for parent in [*tasks] new_tasks = parent.reverse_generated_subgraph(TaskStructure::Hierarchy) - plan.force_gc if !new_tasks.empty? Control.fatal_exception(error, new_tasks) end kill_tasks.merge(new_tasks) end kill_tasks end add_timepoint(stats, :exceptions_fatal) plan.garbage_collect(kill_tasks) add_timepoint(stats, :garbage_collect) application_errors = Thread.current[:application_exceptions] Thread.current[:application_exceptions] = nil for error, origin in application_errors Propagation.add_framework_error(error, origin) end add_timepoint(stats, :application_errors) if abort_on_exception && !quitting? && !fatal_errors.empty? reraise(fatal_errors.map { |e, _| e }) end ensure Thread.current[:application_exceptions] = nil end |
#quit ⇒ Object
Make control quit
683 |
# File 'lib/roby/control.rb', line 683 def quit; @quit += 1 end |
#quitting? ⇒ Boolean
True if the control thread is currently quitting
679 |
# File 'lib/roby/control.rb', line 679 def quitting?; @quit > 0 end |
#reraise(exceptions) ⇒ Object
Abort the control loop because of exceptions
291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/roby/control.rb', line 291 def reraise(exceptions) if exceptions.size == 1 e = exceptions.first if e.kind_of?(ExecutionException) e = e.exception end raise e, e., e.backtrace else raise Aborting.new(exceptions) end end |
#run(options = {}) ⇒ Object
Main event loop. Valid options are
- cycle
-
the cycle duration in seconds (default: 0.1)
- drb
-
address of the DRuby server if one should be started (default: nil)
- detach
-
if true, start in its own thread (default: false)
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 |
# File 'lib/roby/control.rb', line 469 def run( = {}) if running? raise "there is already a control running in thread #{@thread}" end = , :cycle => 0.1, :detach => false @quit = 0 if ![:detach] @thread = Thread.current @thread.priority = THREAD_PRIORITY end if [:detach] # Start the control thread and wait for @thread to be set Roby.condition_variable(true) do |cv, mt| mt.synchronize do Thread.new do run(.merge(:detach => false)) do mt.synchronize { cv.signal } end end cv.wait(mt) end end return end yield if block_given? @cycle_length = [:cycle] event_loop ensure if Thread.current == self.thread Roby::Control.synchronize do # reset the options only if we are in the control thread @thread = nil Control.waiting_threads.each do |th| th.raise ControlQuitError end Control.finalizers.each { |blk| blk.call rescue nil } @quit = 0 end end end |
#running? ⇒ Boolean
454 |
# File 'lib/roby/control.rb', line 454 def running?; !!@thread end |
#structure_checking ⇒ Object
Perform the structure checking step by calling the procs registered in Control::structure_checks. These procs are supposed to return a collection of exception objects, or nil if no error has been found
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/roby/control.rb', line 271 def structure_checking # Do structure checking and gather the raised exceptions exceptions = {} for prc in Control.structure_checks begin new_exceptions = prc.call(plan) rescue Exception => e Propagation.add_framework_error(e, 'structure checking') end next unless new_exceptions [*new_exceptions].each do |e, tasks| e = Propagation.to_execution_exception(e) exceptions[e] = tasks end end exceptions end |
#wait_one_cycle ⇒ Object
Blocks until at least once execution cycle has been done
366 367 368 369 370 371 372 |
# File 'lib/roby/control.rb', line 366 def wait_one_cycle current_cycle = Roby.execute { Roby.control.cycle_index } while current_cycle == Roby.execute { Roby.control.cycle_index } raise ControlQuitError if !Roby.control.running? sleep(Roby.control.cycle_length) end end |