Class: EventMachine::Reactor
- Inherits:
-
Object
- Object
- EventMachine::Reactor
- Includes:
- Singleton
- Defined in:
- lib/em/pure_ruby.rb
Constant Summary collapse
- HeartbeatInterval =
2
Instance Attribute Summary collapse
-
#current_loop_time ⇒ Object
readonly
Returns the value of attribute current_loop_time.
-
#stop_scheduled ⇒ Object
readonly
Returns the value of attribute stop_scheduled.
Instance Method Summary collapse
- #add_selectable(io) ⇒ Object
- #close_loopbreaker ⇒ Object
- #crank_selectables ⇒ Object
- #get_selectable(uuid) ⇒ Object
-
#initialize ⇒ Reactor
constructor
A new instance of Reactor.
-
#initialize_for_run ⇒ Object
Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.
- #install_oneshot_timer(interval) ⇒ Object
- #open_loopbreaker ⇒ Object
- #run ⇒ Object
- #run_heartbeats ⇒ Object
- #run_timers ⇒ Object
- #set_timer_quantum(interval_in_seconds) ⇒ Object
- #signal_loopbreak ⇒ Object
-
#stop ⇒ Object
#stop.
Constructor Details
#initialize ⇒ Reactor
Returns a new instance of Reactor.
507 508 509 |
# File 'lib/em/pure_ruby.rb', line 507 def initialize initialize_for_run end |
Instance Attribute Details
#current_loop_time ⇒ Object (readonly)
Returns the value of attribute current_loop_time.
505 506 507 |
# File 'lib/em/pure_ruby.rb', line 505 def current_loop_time @current_loop_time end |
#stop_scheduled ⇒ Object (readonly)
Returns the value of attribute stop_scheduled.
505 506 507 |
# File 'lib/em/pure_ruby.rb', line 505 def stop_scheduled @stop_scheduled end |
Instance Method Details
#add_selectable(io) ⇒ Object
532 533 534 |
# File 'lib/em/pure_ruby.rb', line 532 def add_selectable io @selectables[io.uuid] = io end |
#close_loopbreaker ⇒ Object
638 639 640 641 |
# File 'lib/em/pure_ruby.rb', line 638 def close_loopbreaker @loopbreak_writer.close @loopbreak_writer = nil end |
#crank_selectables ⇒ Object
589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 |
# File 'lib/em/pure_ruby.rb', line 589 def crank_selectables #$stderr.write 'R' readers = @selectables.values.select {|io| io.select_for_reading?} writers = @selectables.values.select {|io| io.select_for_writing?} s = select( readers, writers, nil, @timer_quantum) s and s[1] and s[1].each {|w| w.eventable_write } s and s[0] and s[0].each {|r| r.eventable_read } @selectables.delete_if {|k,io| if io.close_scheduled? io.close begin EventMachine::event_callback io.uuid, ConnectionUnbound, nil rescue ConnectionNotBound; end true end } end |
#get_selectable(uuid) ⇒ Object
536 537 538 |
# File 'lib/em/pure_ruby.rb', line 536 def get_selectable uuid @selectables[uuid] end |
#initialize_for_run ⇒ Object
Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.
522 523 524 525 526 527 528 529 530 |
# File 'lib/em/pure_ruby.rb', line 522 def initialize_for_run @running = false @stop_scheduled = false @selectables ||= {}; @selectables.clear @timers = SortedSet.new # [] set_timer_quantum(0.1) @current_loop_time = Time.now @next_heartbeat = @current_loop_time + HeartbeatInterval end |
#install_oneshot_timer(interval) ⇒ Object
511 512 513 514 515 516 517 |
# File 'lib/em/pure_ruby.rb', line 511 def install_oneshot_timer interval uuid = UuidGenerator::generate #@timers << [Time.now + interval, uuid] #@timers.sort! {|a,b| a.first <=> b.first} @timers.add([Time.now + interval, uuid]) uuid end |
#open_loopbreaker ⇒ Object
617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 |
# File 'lib/em/pure_ruby.rb', line 617 def open_loopbreaker # Can't use an IO.pipe because they can't be set nonselectable in Windows. # Pick a random localhost UDP port. #@loopbreak_writer.close if @loopbreak_writer #rd,@loopbreak_writer = IO.pipe @loopbreak_reader = UDPSocket.new @loopbreak_writer = UDPSocket.new bound = false 100.times { @loopbreak_port = rand(10000) + 40000 begin @loopbreak_reader.bind "127.0.0.1", @loopbreak_port bound = true break rescue end } raise "Unable to bind Loopbreaker" unless bound LoopbreakReader.new(@loopbreak_reader) end |
#run ⇒ Object
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 |
# File 'lib/em/pure_ruby.rb', line 540 def run raise Error.new( "already running" ) if @running @running = true begin open_loopbreaker loop { @current_loop_time = Time.now break if @stop_scheduled run_timers break if @stop_scheduled crank_selectables break if @stop_scheduled run_heartbeats } ensure close_loopbreaker @selectables.each {|k, io| io.close} @selectables.clear @running = false end end |
#run_heartbeats ⇒ Object
582 583 584 585 586 587 |
# File 'lib/em/pure_ruby.rb', line 582 def run_heartbeats if @next_heartbeat <= @current_loop_time @next_heartbeat = @current_loop_time + HeartbeatInterval @selectables.each {|k,io| io.heartbeat} end end |
#run_timers ⇒ Object
567 568 569 570 571 572 573 574 575 576 577 578 579 580 |
# File 'lib/em/pure_ruby.rb', line 567 def run_timers @timers.each {|t| if t.first <= @current_loop_time @timers.delete t EventMachine::event_callback "", TimerFired, t.last else break end } #while @timers.length > 0 and @timers.first.first <= now # t = @timers.shift # EventMachine::event_callback "", TimerFired, t.last #end end |
#set_timer_quantum(interval_in_seconds) ⇒ Object
649 650 651 |
# File 'lib/em/pure_ruby.rb', line 649 def set_timer_quantum interval_in_seconds @timer_quantum = interval_in_seconds end |
#signal_loopbreak ⇒ Object
643 644 645 646 647 |
# File 'lib/em/pure_ruby.rb', line 643 def signal_loopbreak begin @loopbreak_writer.send('+',0,"127.0.0.1",@loopbreak_port) if @loopbreak_writer rescue IOError; end end |
#stop ⇒ Object
#stop
612 613 614 615 |
# File 'lib/em/pure_ruby.rb', line 612 def stop raise Error.new( "not running") unless @running @stop_scheduled = true end |