Class: EventMachine::Reactor
- Inherits:
-
Object
- Object
- EventMachine::Reactor
- Includes:
- Singleton
- Defined in:
- lib/em/pure_ruby.rb
Constant Summary collapse
- HeartbeatInterval =
2
Instance Attribute Summary collapse
-
#current_loop_time ⇒ Object
readonly
Returns the value of attribute current_loop_time.
-
#stop_scheduled ⇒ Object
readonly
Returns the value of attribute stop_scheduled.
Instance Method Summary collapse
- #add_selectable(io) ⇒ Object
- #close_loopbreaker ⇒ Object
- #crank_selectables ⇒ Object
- #get_selectable(uuid) ⇒ Object
- #get_timer_count ⇒ Object
-
#initialize ⇒ Reactor
constructor
A new instance of Reactor.
-
#initialize_for_run ⇒ Object
Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.
- #install_oneshot_timer(interval) ⇒ Object
- #open_loopbreaker ⇒ Object
- #run ⇒ Object
- #run_heartbeats ⇒ Object
- #run_timers ⇒ Object
- #set_timer_quantum(interval_in_seconds) ⇒ Object
- #signal_loopbreak ⇒ Object
-
#stop ⇒ Object
#stop.
Constructor Details
#initialize ⇒ Reactor
Returns a new instance of Reactor.
511 512 513 |
# File 'lib/em/pure_ruby.rb', line 511 def initialize initialize_for_run end |
Instance Attribute Details
#current_loop_time ⇒ Object (readonly)
Returns the value of attribute current_loop_time.
509 510 511 |
# File 'lib/em/pure_ruby.rb', line 509 def current_loop_time @current_loop_time end |
#stop_scheduled ⇒ Object (readonly)
Returns the value of attribute stop_scheduled.
509 510 511 |
# File 'lib/em/pure_ruby.rb', line 509 def stop_scheduled @stop_scheduled end |
Instance Method Details
#add_selectable(io) ⇒ Object
540 541 542 |
# File 'lib/em/pure_ruby.rb', line 540 def add_selectable io @selectables[io.uuid] = io end |
#close_loopbreaker ⇒ Object
656 657 658 659 |
# File 'lib/em/pure_ruby.rb', line 656 def close_loopbreaker @loopbreak_writer.close @loopbreak_writer = nil end |
#crank_selectables ⇒ Object
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 |
# File 'lib/em/pure_ruby.rb', line 607 def crank_selectables #$stderr.write 'R' readers = @selectables.values.select {|io| io.select_for_reading?} writers = @selectables.values.select {|io| io.select_for_writing?} s = select( readers, writers, nil, @timer_quantum) s and s[1] and s[1].each {|w| w.eventable_write } s and s[0] and s[0].each {|r| r.eventable_read } @selectables.delete_if {|k,io| if io.close_scheduled? io.close begin EventMachine::event_callback io.uuid, ConnectionUnbound, nil rescue ConnectionNotBound; end true end } end |
#get_selectable(uuid) ⇒ Object
544 545 546 547 548 |
# File 'lib/em/pure_ruby.rb', line 544 def get_selectable uuid #raise Error.new("selectable does not exist") if @selectables[uuid].nil? #puts "selectables are #{@selectables}" @selectables[uuid] end |
#get_timer_count ⇒ Object
515 516 517 |
# File 'lib/em/pure_ruby.rb', line 515 def get_timer_count @timers.size end |
#initialize_for_run ⇒ Object
Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.
530 531 532 533 534 535 536 537 538 |
# File 'lib/em/pure_ruby.rb', line 530 def initialize_for_run @running = false @stop_scheduled = false @selectables ||= {}; @selectables.clear @timers = SortedSet.new # [] set_timer_quantum(0.1) @current_loop_time = Time.now @next_heartbeat = @current_loop_time + HeartbeatInterval end |
#install_oneshot_timer(interval) ⇒ Object
519 520 521 522 523 524 525 |
# File 'lib/em/pure_ruby.rb', line 519 def install_oneshot_timer interval uuid = UuidGenerator::generate #@timers << [Time.now + interval, uuid] #@timers.sort! {|a,b| a.first <=> b.first} @timers.add([Time.now + interval, uuid]) uuid end |
#open_loopbreaker ⇒ Object
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 |
# File 'lib/em/pure_ruby.rb', line 635 def open_loopbreaker # Can't use an IO.pipe because they can't be set nonselectable in Windows. # Pick a random localhost UDP port. #@loopbreak_writer.close if @loopbreak_writer #rd,@loopbreak_writer = IO.pipe @loopbreak_reader = UDPSocket.new @loopbreak_writer = UDPSocket.new bound = false 100.times { @loopbreak_port = rand(10000) + 40000 begin @loopbreak_reader.bind "127.0.0.1", @loopbreak_port bound = true break rescue end } raise "Unable to bind Loopbreaker" unless bound LoopbreakReader.new(@loopbreak_reader) end |
#run ⇒ Object
550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 |
# File 'lib/em/pure_ruby.rb', line 550 def run raise Error.new( "already running" ) if @running puts "-------- RUNNING ------------ " @running = true begin open_loopbreaker loop { @current_loop_time = Time.now break if @stop_scheduled run_timers break if @stop_scheduled crank_selectables break if @stop_scheduled run_heartbeats } ensure close_loopbreaker @selectables.each {|k, io| io.close} @selectables.clear @running = false end end |
#run_heartbeats ⇒ Object
600 601 602 603 604 605 |
# File 'lib/em/pure_ruby.rb', line 600 def run_heartbeats if @next_heartbeat <= @current_loop_time @next_heartbeat = @current_loop_time + HeartbeatInterval @selectables.each {|k,io| io.heartbeat} end end |
#run_timers ⇒ Object
578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 |
# File 'lib/em/pure_ruby.rb', line 578 def run_timers timers_to_delete = [] @timers.each {|t| if t.first <= @current_loop_time #@timers.delete t timers_to_delete << t else break end } timers_to_delete.map{|c| EventMachine::event_callback "", TimerFired, c.last @timers.delete c } timers_to_delete = nil #while @timers.length > 0 and @timers.first.first <= now # t = @timers.shift # EventMachine::event_callback "", TimerFired, t.last #end end |
#set_timer_quantum(interval_in_seconds) ⇒ Object
667 668 669 |
# File 'lib/em/pure_ruby.rb', line 667 def set_timer_quantum interval_in_seconds @timer_quantum = interval_in_seconds end |
#signal_loopbreak ⇒ Object
661 662 663 664 665 |
# File 'lib/em/pure_ruby.rb', line 661 def signal_loopbreak begin @loopbreak_writer.send('+',0,"127.0.0.1",@loopbreak_port) if @loopbreak_writer rescue IOError; end end |
#stop ⇒ Object
#stop
630 631 632 633 |
# File 'lib/em/pure_ruby.rb', line 630 def stop raise Error.new( "not running") unless @running @stop_scheduled = true end |