Class: EventMachine::Reactor

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/em/pure_ruby.rb

Constant Summary collapse

HeartbeatInterval =
2

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



511
512
513
# File 'lib/em/pure_ruby.rb', line 511

def initialize
  initialize_for_run
end

Instance Attribute Details

#current_loop_timeObject (readonly)

Returns the value of attribute current_loop_time.



509
510
511
# File 'lib/em/pure_ruby.rb', line 509

def current_loop_time
  @current_loop_time
end

#stop_scheduledObject (readonly)

Returns the value of attribute stop_scheduled.



509
510
511
# File 'lib/em/pure_ruby.rb', line 509

def stop_scheduled
  @stop_scheduled
end

Instance Method Details

#add_selectable(io) ⇒ Object



540
541
542
# File 'lib/em/pure_ruby.rb', line 540

def add_selectable io
  @selectables[io.uuid] = io
end

#close_loopbreakerObject



656
657
658
659
# File 'lib/em/pure_ruby.rb', line 656

def close_loopbreaker
  @loopbreak_writer.close
  @loopbreak_writer = nil
end

#crank_selectablesObject



607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
# File 'lib/em/pure_ruby.rb', line 607

def crank_selectables
  #$stderr.write 'R'

  readers = @selectables.values.select {|io| io.select_for_reading?}
  writers = @selectables.values.select {|io| io.select_for_writing?}

  s = select( readers, writers, nil, @timer_quantum)

  s and s[1] and s[1].each {|w| w.eventable_write }
  s and s[0] and s[0].each {|r| r.eventable_read }

  @selectables.delete_if {|k,io|
    if io.close_scheduled?
      io.close
      begin
        EventMachine::event_callback io.uuid, ConnectionUnbound, nil
      rescue ConnectionNotBound; end
      true
    end
  }
end

#get_selectable(uuid) ⇒ Object



544
545
546
547
548
# File 'lib/em/pure_ruby.rb', line 544

def get_selectable uuid
  #raise Error.new("selectable does not exist") if @selectables[uuid].nil?
  #puts "selectables are #{@selectables}"
  @selectables[uuid]
end

#get_timer_countObject



515
516
517
# File 'lib/em/pure_ruby.rb', line 515

def get_timer_count
  @timers.size
end

#initialize_for_runObject

Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.



530
531
532
533
534
535
536
537
538
# File 'lib/em/pure_ruby.rb', line 530

def initialize_for_run
  @running = false
  @stop_scheduled = false
  @selectables ||= {}; @selectables.clear
  @timers = SortedSet.new # []
  set_timer_quantum(0.1)
  @current_loop_time = Time.now
  @next_heartbeat = @current_loop_time + HeartbeatInterval
end

#install_oneshot_timer(interval) ⇒ Object



519
520
521
522
523
524
525
# File 'lib/em/pure_ruby.rb', line 519

def install_oneshot_timer interval
  uuid = UuidGenerator::generate
  #@timers << [Time.now + interval, uuid]
  #@timers.sort! {|a,b| a.first <=> b.first}
  @timers.add([Time.now + interval, uuid])
  uuid
end

#open_loopbreakerObject



635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
# File 'lib/em/pure_ruby.rb', line 635

def open_loopbreaker
  # Can't use an IO.pipe because they can't be set nonselectable in Windows.
  # Pick a random localhost UDP port.
  #@loopbreak_writer.close if @loopbreak_writer
  #rd,@loopbreak_writer = IO.pipe
  @loopbreak_reader = UDPSocket.new
  @loopbreak_writer = UDPSocket.new
  bound = false
  100.times {
    @loopbreak_port = rand(10000) + 40000
    begin
      @loopbreak_reader.bind "127.0.0.1", @loopbreak_port
      bound = true
      break
    rescue
    end
  }
  raise "Unable to bind Loopbreaker" unless bound
  LoopbreakReader.new(@loopbreak_reader)
end

#runObject

Raises:



550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
# File 'lib/em/pure_ruby.rb', line 550

def run
  raise Error.new( "already running" ) if @running
  puts "-------- RUNNING ------------ "
  @running = true

  begin
    open_loopbreaker

    loop {
      @current_loop_time = Time.now

      break if @stop_scheduled
      run_timers
      break if @stop_scheduled
      crank_selectables
      break if @stop_scheduled
      run_heartbeats
    }
  ensure
    close_loopbreaker
    @selectables.each {|k, io| io.close}
    @selectables.clear

    @running = false
  end

end

#run_heartbeatsObject



600
601
602
603
604
605
# File 'lib/em/pure_ruby.rb', line 600

def run_heartbeats
  if @next_heartbeat <= @current_loop_time
    @next_heartbeat = @current_loop_time + HeartbeatInterval
    @selectables.each {|k,io| io.heartbeat}
  end
end

#run_timersObject



578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
# File 'lib/em/pure_ruby.rb', line 578

def run_timers
  timers_to_delete = []
  @timers.each {|t|
    if t.first <= @current_loop_time
      #@timers.delete t
      timers_to_delete << t
      
    else
      break
    end
  }
  timers_to_delete.map{|c| 
    EventMachine::event_callback "", TimerFired, c.last
    @timers.delete c
  }
  timers_to_delete = nil
  #while @timers.length > 0 and @timers.first.first <= now
  #  t = @timers.shift
  #  EventMachine::event_callback "", TimerFired, t.last
  #end
end

#set_timer_quantum(interval_in_seconds) ⇒ Object



667
668
669
# File 'lib/em/pure_ruby.rb', line 667

def set_timer_quantum interval_in_seconds
  @timer_quantum = interval_in_seconds
end

#signal_loopbreakObject



661
662
663
664
665
# File 'lib/em/pure_ruby.rb', line 661

def signal_loopbreak
  begin
    @loopbreak_writer.send('+',0,"127.0.0.1",@loopbreak_port) if @loopbreak_writer
  rescue IOError; end
end

#stopObject

#stop

Raises:



630
631
632
633
# File 'lib/em/pure_ruby.rb', line 630

def stop
  raise Error.new( "not running") unless @running
  @stop_scheduled = true
end