Class: Roby::Control

Inherits:
Object show all
Includes:
DRbUndumped, Log::ControlHooks, Singleton
Defined in:
lib/roby/control.rb,
lib/roby.rb,
lib/roby/distributed/peer.rb

Overview

This singleton class is the central object: it handles the event loop, event propagation and exception propagation.

Constant Summary collapse

SLEEP_MIN_TIME =

Do not sleep or call Thread#pass if there is less that this much time left in the cycle

0.01
THREAD_PRIORITY =

The priority of the control thread

10

Constants included from Log::ControlHooks

Log::ControlHooks::HOOKS

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeControl



254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/roby/control.rb', line 254

def initialize
    super
    @quit        = 0
    @thread      = nil
    @cycle_index = 0
    @cycle_start = Time.now
    @cycle_length = 0
    @planners    = []
    @last_stop_count = 0
    @plan        = Plan.new
    Roby.instance_variable_set(:@plan, @plan)
    plan.extend Roby::Propagation::ExecutablePlanChanged
end

Class Attribute Details

.at_cycle_end_handlersObject (readonly)

A set of blocks that are called at each cycle end



406
407
408
# File 'lib/roby/control.rb', line 406

def at_cycle_end_handlers
  @at_cycle_end_handlers
end

.event_processingObject (readonly)

List of procs which are called at each event cycle



239
240
241
# File 'lib/roby/control.rb', line 239

def event_processing
  @event_processing
end

.mutexObject (readonly)

Returns the value of attribute mutex.



200
201
202
# File 'lib/roby/control.rb', line 200

def mutex
  @mutex
end

.process_everyObject (readonly)

A set of blocks which are called every cycle



414
415
416
# File 'lib/roby/control.rb', line 414

def process_every
  @process_every
end

.process_onceObject (readonly)

A list of blocks to be called at the beginning of the next event loop



386
387
388
# File 'lib/roby/control.rb', line 386

def process_once
  @process_once
end

.structure_checksObject (readonly)

List of procs to be called for task structure checking

The blocks return a set of exceptions or nil. The exception must respond to #task or #generator to know from which task the problem comes.



246
247
248
# File 'lib/roby/control.rb', line 246

def structure_checks
  @structure_checks
end

.waiting_threadsObject (readonly)

A list of threads which are currently waitiing for the control thread (see for instance Roby.execute)

Control#run will raise ControlQuitError on this threads if they are still waiting while the control is quitting



384
385
386
# File 'lib/roby/control.rb', line 384

def waiting_threads
  @waiting_threads
end

Instance Attribute Details

#abort_on_application_exceptionObject

If true, abort if an application exception is found



231
232
233
# File 'lib/roby/control.rb', line 231

def abort_on_application_exception
  @abort_on_application_exception
end

#abort_on_exceptionObject

If true, abort if an unhandled exception is found



229
230
231
# File 'lib/roby/control.rb', line 229

def abort_on_exception
  @abort_on_exception
end

#abort_on_framework_exceptionObject

If true, abort if a framework exception is found



233
234
235
# File 'lib/roby/control.rb', line 233

def abort_on_framework_exception
  @abort_on_framework_exception
end

#cycle_indexObject (readonly)

The number of this cycle since the beginning



463
464
465
# File 'lib/roby/control.rb', line 463

def cycle_index
  @cycle_index
end

#cycle_lengthObject (readonly)

The cycle length in seconds



457
458
459
# File 'lib/roby/control.rb', line 457

def cycle_length
  @cycle_length
end

#cycle_startObject (readonly)

The starting point of this cycle



460
461
462
# File 'lib/roby/control.rb', line 460

def cycle_start
  @cycle_start
end

#last_stop_countObject (readonly)

Returns the value of attribute last_stop_count.



517
518
519
# File 'lib/roby/control.rb', line 517

def last_stop_count
  @last_stop_count
end

#planObject (readonly)

The plan being executed



250
251
252
# File 'lib/roby/control.rb', line 250

def plan
  @plan
end

#plannersObject (readonly)

A set of planners declared in this application



252
253
254
# File 'lib/roby/control.rb', line 252

def planners
  @planners
end

#remaining_cycle_timeObject (readonly)

Returns the value of attribute remaining_cycle_time.



556
557
558
# File 'lib/roby/control.rb', line 556

def remaining_cycle_time
  @remaining_cycle_time
end

#threadObject

Returns the value of attribute thread.



453
454
455
# File 'lib/roby/control.rb', line 453

def thread
  @thread
end

Class Method Details

.at_cycle_end(&block) ⇒ Object

Call block at the end of the execution cycle



409
410
411
# File 'lib/roby/control.rb', line 409

def at_cycle_end(&block)
    Control.at_cycle_end_handlers << block
end

.call_everyObject

:nodoc:



432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
# File 'lib/roby/control.rb', line 432

def call_every # :nodoc:
    now        = Roby.control.cycle_start
    length     = Roby.control.cycle_length
    process_every.map! do |block, last_call, duration|
 begin
      # Check if the nearest timepoint is the beginning of
      # this cycle or of the next cycle
      if !last_call || (duration - (now - last_call)) < length / 2
  block.call
  last_call = now
      end
 rescue Exception => e
      Propagation.add_framework_error(e, "#call_every, in #{block}")
 end
 [block, last_call, duration]
    end
end

.call_onceObject

Calls all pending procs in process_once



388
389
390
391
392
393
394
395
396
397
# File 'lib/roby/control.rb', line 388

def call_once # :nodoc:
    while !process_once.empty?
 p = process_once.pop
 begin
      p.call
 rescue Exception => e
      Propagation.add_framework_error(e, "call once in #{p}")
 end
    end
end

.each_cycle(&block) ⇒ Object

Call block at each cycle



403
# File 'lib/roby/control.rb', line 403

def each_cycle(&block); Control.event_processing << block end

.every(duration, &block) ⇒ Object

Call block every duration seconds. Note that duration is round up to the cycle size (time between calls is *at least* duration)



418
419
420
421
422
423
424
# File 'lib/roby/control.rb', line 418

def every(duration, &block)
    Control.once do
 block.call
 process_every << [block, Roby.control.cycle_start, duration]
    end
    block.object_id
end

.fatal_exception(error, tasks) ⇒ Object

Hook called when a set of tasks is being killed because of an exception



721
722
723
724
725
726
# File 'lib/roby/control.rb', line 721

def self.fatal_exception(error, tasks)
    super if defined? super
           Roby.format_exception(error.exception).each do |line|
               Roby.warn line
           end
end

.finalizersObject



676
# File 'lib/roby/control.rb', line 676

def self.finalizers; @finalizers end

.handled_exception(e, task) ⇒ Object

Hook called when an exception e has been handled by task



728
# File 'lib/roby/control.rb', line 728

def self.handled_exception(e, task); super if defined? super end

.once(&block) ⇒ Object

Call block once before event processing



401
# File 'lib/roby/control.rb', line 401

def once(&block); process_once.push block end

.remove_periodic_handler(id) ⇒ Object



426
427
428
429
430
# File 'lib/roby/control.rb', line 426

def remove_periodic_handler(id)
    Roby.execute do
 process_every.delete_if { |spec| spec[0].object_id == id }
    end
end

.synchronizeObject

Implements a recursive behaviour on Control.mutex



205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/roby/control.rb', line 205

def synchronize
    if Thread.current[:control_mutex_locked]
 yield
    else
 mutex.lock
 begin
      Thread.current[:control_mutex_locked] = true
      yield
 ensure
      Thread.current[:control_mutex_locked] = false
      mutex.unlock
 end
    end
end

.taken_mutex?Boolean



202
# File 'lib/roby/control.rb', line 202

def taken_mutex?; Thread.current[:control_mutex_locked] end

Instance Method Details

#add_expected_duration(stats, name, duration) ⇒ Object



561
562
563
# File 'lib/roby/control.rb', line 561

def add_expected_duration(stats, name, duration)
    stats[name] = Time.now + duration - cycle_start
end

#add_timepoint(stats, name) ⇒ Object



557
558
559
560
# File 'lib/roby/control.rb', line 557

def add_timepoint(stats, name)
    stats[:end] = stats[name] = Time.now - cycle_start
    @remaining_cycle_time = cycle_length - stats[:end]
end

#clearObject



518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
# File 'lib/roby/control.rb', line 518

def clear
    Control.synchronize do
  plan.missions.dup.each { |t| plan.discard(t) }
  plan.keepalive.dup.each { |t| plan.auto(t) }
  plan.force_gc.merge( plan.known_tasks )

  quaranteened_subplan = plan.useful_task_component(nil, ValueSet.new, plan.gc_quarantine.dup)
  remaining = plan.known_tasks - quaranteened_subplan

  if remaining.empty?
      # Have to call #garbage_collect one more to make
      # sure that unneeded events are removed as well
      plan.garbage_collect
      # Done cleaning the tasks, clear the remains
      plan.transactions.each do |trsc|
    trsc.discard_transaction if trsc.self_owned?
      end
      plan.clear
      return
  end

  if last_stop_count != remaining.size
      if last_stop_count == 0
    Roby.info "control quitting. Waiting for #{remaining.size} tasks to finish (#{plan.size} tasks still in plan)"
    Roby.debug "  " + remaining.to_a.join("\n  ")
      else
    Roby.info "waiting for #{remaining.size} tasks to finish (#{plan.size} tasks still in plan)"
    Roby.debug "  #{remaining.to_a.join("\n  ")}"
      end
      if plan.gc_quarantine.size != 0
    Roby.info "#{plan.gc_quarantine.size} tasks in quarantine"
      end
      @last_stop_count = remaining.size
  end
  remaining
    end
end

#cycle_end(stats) ⇒ Object

Called at each cycle end



686
687
688
689
690
691
692
693
694
695
696
# File 'lib/roby/control.rb', line 686

def cycle_end(stats)
    super if defined? super 

    Control.at_cycle_end_handlers.each do |handler|
  begin
      handler.call
  rescue Exception => e
      Propagation.add_framework_error(e, "during cycle end handler #{handler}")
  end
    end
end

#event_loopObject



565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
# File 'lib/roby/control.rb', line 565

def event_loop
    @last_stop_count = 0
    @cycle_start  = Time.now
    @cycle_index  = 0

    gc_enable_has_argument = begin
         GC.enable(true)
         true
           rescue; false
           end
    stats = Hash.new
    if ObjectSpace.respond_to?(:live_objects)
  stats[:live_objects] = ObjectSpace.live_objects
    end

    GC.start
    if gc_enable_has_argument
  already_disabled_gc = GC.disable
    end
    loop do
  begin
      if quitting?
    thread.priority = 0
    begin
        return if forced_exit? || !clear
    rescue Exception => e
        Roby.warn "Control failed to clean up"
                           Roby.format_exception(e).each do |line|
                               Roby.warn line
                           end
        return
    end
      end

      while Time.now > cycle_start + cycle_length
    @cycle_start += cycle_length
    @cycle_index += 1
      end
      stats[:start]       = [cycle_start.tv_sec, cycle_start.tv_usec]
      stats[:cycle_index] = cycle_index
      Control.synchronize { process_events(stats) }
      
      # Record the statistics about object allocation *before* running the Ruby
      # GC. It is also updated at 
      if ObjectSpace.respond_to?(:live_objects)
    live_objects_before_gc = ObjectSpace.live_objects
      end

      # If the ruby interpreter we run on offers a true/false argument to
      # GC.enable, we disabled the GC and just run GC.enable(true) to make
      # it run immediately if needed. Then, we re-disable it just after.
      if gc_enable_has_argument && remaining_cycle_time > SLEEP_MIN_TIME
    GC.enable(true)
    GC.disable
      end
      add_timepoint(stats, :ruby_gc)

      if ObjectSpace.respond_to?(:live_objects)
    live_objects_after_gc = ObjectSpace.live_objects
      end
      
      # Sleep if there is enough time for it
      if remaining_cycle_time > SLEEP_MIN_TIME
    add_expected_duration(stats, :sleep, remaining_cycle_time)
    sleep(remaining_cycle_time) 
      end
      add_timepoint(stats, :sleep)

      # Add some statistics and call cycle_end
      if defined? Roby::Log
    stats[:log_queue_size] = Roby::Log.logged_events.size
      end
      stats[:plan_task_count]  = plan.known_tasks.size
      stats[:plan_event_count] = plan.free_events.size
      process_time = Process.times
      stats[:cpu_time] = (process_time.utime + process_time.stime) * 1000

      if ObjectSpace.respond_to?(:live_objects)
    live_objects = ObjectSpace.live_objects
    stats[:object_allocation] = live_objects - stats[:live_objects] - (live_objects_after_gc - live_objects_before_gc)
    stats[:live_objects]      = live_objects
      end

      cycle_end(stats)

      stats = Hash.new
      stats[:live_objects] = live_objects
      @cycle_start += cycle_length
      @cycle_index += 1

  rescue Exception => e
      Roby.warn "Control quitting because of unhandled exception"
                   Roby.format_exception(e).each do |line|
                       Roby.warn line
                   end
      quit
  end
    end

ensure
    GC.enable if !already_disabled_gc

    if !plan.known_tasks.empty?
  Roby.warn "the following tasks are still present in the plan:"
  plan.known_tasks.each do |t|
      Roby.warn "  #{t}"
  end
    end
end

#forced_exit?Boolean

True if the control thread is currently quitting



681
# File 'lib/roby/control.rb', line 681

def forced_exit?; @quit > 1 end

#joinObject

If the event thread has been started in its own thread, wait for it to terminate



700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
# File 'lib/roby/control.rb', line 700

def join
    thread.join if thread

rescue Interrupt
    Roby::Control.synchronize do
  return unless thread

  Roby.logger.level = Logger::INFO
  Roby.warn "received interruption request"
  quit
  if @quit > 2
      thread.raise Interrupt, "interrupting control thread at user request"
  end
    end

    retry
end

#process_events(stats = {}) ⇒ Object

Process the pending events. The time at each event loop step is saved into stats.



305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/roby/control.rb', line 305

def process_events(stats = {})
    Thread.current[:application_exceptions] = []

    add_timepoint(stats, :real_start)

    # Gather new events and propagate them
    events_errors = Propagation.propagate_events(Control.event_processing)
    add_timepoint(stats, :events)

    # HACK: events_errors is sometime nil here. It shouldn't
    events_errors ||= []

    # Generate exceptions from task structure
    structure_errors = structure_checking
    add_timepoint(stats, :structure_check)

    # Propagate the errors. Note that the plan repairs are taken into
    # account in Propagation.propagate_exceptions drectly.  We keep
    # event and structure errors separate since in the first case there
    # is not two-stage handling (all errors that have not been handled
    # are fatal), and in the second case we call #structure_checking
    # again to get the remaining errors
    events_errors    = Propagation.propagate_exceptions(events_errors)
    Propagation.propagate_exceptions(structure_errors)
    add_timepoint(stats, :exception_propagation)

    # Get the remaining problems in the plan structure, and act on it
    fatal_structure_errors = Propagation.remove_inhibited_exceptions(structure_checking)
    fatal_errors = fatal_structure_errors.to_a + events_errors
    kill_tasks = fatal_errors.inject(ValueSet.new) do |kill_tasks, (error, tasks)|
  tasks ||= [*error.task]
  for parent in [*tasks]
      new_tasks = parent.reverse_generated_subgraph(TaskStructure::Hierarchy) - plan.force_gc
      if !new_tasks.empty?
    Control.fatal_exception(error, new_tasks)
      end
      kill_tasks.merge(new_tasks)
  end
  kill_tasks
    end
    add_timepoint(stats, :exceptions_fatal)

    plan.garbage_collect(kill_tasks)
    add_timepoint(stats, :garbage_collect)

    application_errors = Thread.current[:application_exceptions]
    Thread.current[:application_exceptions] = nil
    for error, origin in application_errors
  Propagation.add_framework_error(error, origin)
    end
    add_timepoint(stats, :application_errors)

    if abort_on_exception && !quitting? && !fatal_errors.empty?
  reraise(fatal_errors.map { |e, _| e })
    end

ensure
    Thread.current[:application_exceptions] = nil
end

#quitObject

Make control quit



683
# File 'lib/roby/control.rb', line 683

def quit; @quit += 1 end

#quitting?Boolean

True if the control thread is currently quitting



679
# File 'lib/roby/control.rb', line 679

def quitting?; @quit > 0 end

#reraise(exceptions) ⇒ Object

Abort the control loop because of exceptions



291
292
293
294
295
296
297
298
299
300
301
# File 'lib/roby/control.rb', line 291

def reraise(exceptions)
    if exceptions.size == 1
  e = exceptions.first
  if e.kind_of?(ExecutionException)
      e = e.exception
  end
  raise e, e.message, e.backtrace
    else
  raise Aborting.new(exceptions)
    end
end

#run(options = {}) ⇒ Object

Main event loop. Valid options are

cycle

the cycle duration in seconds (default: 0.1)

drb

address of the DRuby server if one should be started (default: nil)

detach

if true, start in its own thread (default: false)



469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
# File 'lib/roby/control.rb', line 469

def run(options = {})
    if running?
  raise "there is already a control running in thread #{@thread}"
    end

    options = validate_options options, 
  :cycle => 0.1, :detach => false

    @quit = 0
    if !options[:detach]
  @thread = Thread.current
  @thread.priority = THREAD_PRIORITY
    end

    if options[:detach]
  # Start the control thread and wait for @thread to be set
  Roby.condition_variable(true) do |cv, mt|
      mt.synchronize do
    Thread.new do
        run(options.merge(:detach => false)) do
      mt.synchronize { cv.signal }
        end
    end
    cv.wait(mt)
      end
  end
  return
    end

    yield if block_given?

    @cycle_length = options[:cycle]
    event_loop

ensure
    if Thread.current == self.thread
  Roby::Control.synchronize do
      # reset the options only if we are in the control thread
      @thread = nil
      Control.waiting_threads.each do |th|
    th.raise ControlQuitError
      end
      Control.finalizers.each { |blk| blk.call rescue nil }
      @quit = 0
  end
    end
end

#running?Boolean



454
# File 'lib/roby/control.rb', line 454

def running?; !!@thread end

#structure_checkingObject

Perform the structure checking step by calling the procs registered in Control::structure_checks. These procs are supposed to return a collection of exception objects, or nil if no error has been found



271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/roby/control.rb', line 271

def structure_checking
    # Do structure checking and gather the raised exceptions
    exceptions = {}
    for prc in Control.structure_checks
  begin
      new_exceptions = prc.call(plan)
  rescue Exception => e
      Propagation.add_framework_error(e, 'structure checking')
  end
  next unless new_exceptions

  [*new_exceptions].each do |e, tasks|
      e = Propagation.to_execution_exception(e)
      exceptions[e] = tasks
  end
    end
    exceptions
end

#wait_one_cycleObject

Blocks until at least once execution cycle has been done



366
367
368
369
370
371
372
# File 'lib/roby/control.rb', line 366

def wait_one_cycle
    current_cycle = Roby.execute { Roby.control.cycle_index }
    while current_cycle == Roby.execute { Roby.control.cycle_index }
  raise ControlQuitError if !Roby.control.running?
  sleep(Roby.control.cycle_length)
    end
end