Class: EventMachine::Reactor
- Inherits:
-
Object
- Object
- EventMachine::Reactor
- Includes:
- Singleton
- Defined in:
- lib/pr_eventmachine.rb
Constant Summary collapse
- HeartbeatInterval =
2
Instance Attribute Summary collapse
-
#current_loop_time ⇒ Object
readonly
Returns the value of attribute current_loop_time.
Instance Method Summary collapse
- #add_selectable(io) ⇒ Object
- #close_loopbreaker ⇒ Object
- #crank_selectables ⇒ Object
- #get_selectable(uuid) ⇒ Object
-
#initialize ⇒ Reactor
constructor
A new instance of Reactor.
-
#initialize_for_run ⇒ Object
Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.
-
#install_oneshot_timer(interval) ⇒ Object
– Replaced original implementation 05Dec07, was way too slow because of the sort.
- #open_loopbreaker ⇒ Object
- #run ⇒ Object
- #run_heartbeats ⇒ Object
- #run_timers ⇒ Object
- #set_timer_quantum(interval_in_seconds) ⇒ Object
- #signal_loopbreak ⇒ Object
-
#stop ⇒ Object
#stop.
Constructor Details
#initialize ⇒ Reactor
Returns a new instance of Reactor.
277 278 279 |
# File 'lib/pr_eventmachine.rb', line 277 def initialize initialize_for_run end |
Instance Attribute Details
#current_loop_time ⇒ Object (readonly)
Returns the value of attribute current_loop_time.
275 276 277 |
# File 'lib/pr_eventmachine.rb', line 275 def current_loop_time @current_loop_time end |
Instance Method Details
#add_selectable(io) ⇒ Object
303 304 305 |
# File 'lib/pr_eventmachine.rb', line 303 def add_selectable io @selectables[io.uuid] = io end |
#close_loopbreaker ⇒ Object
406 407 408 409 |
# File 'lib/pr_eventmachine.rb', line 406 def close_loopbreaker @loopbreak_writer.close @loopbreak_writer = nil end |
#crank_selectables ⇒ Object
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
# File 'lib/pr_eventmachine.rb', line 360 def crank_selectables #$stderr.write 'R' readers = @selectables.values.select {|io| io.select_for_reading?} writers = @selectables.values.select {|io| io.select_for_writing?} s = select( readers, writers, nil, @timer_quantum) s and s[1] and s[1].each {|w| w.eventable_write } s and s[0] and s[0].each {|r| r.eventable_read } @selectables.delete_if {|k,io| if io.close_scheduled? io.close true end } end |
#get_selectable(uuid) ⇒ Object
307 308 309 |
# File 'lib/pr_eventmachine.rb', line 307 def get_selectable uuid @selectables[uuid] end |
#initialize_for_run ⇒ Object
Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.
293 294 295 296 297 298 299 300 301 |
# File 'lib/pr_eventmachine.rb', line 293 def initialize_for_run @running = false @stop_scheduled = false @selectables ||= {}; @selectables.clear @timers = SortedSet.new # [] set_timer_quantum(0.1) @current_loop_time = Time.now @next_heartbeat = @current_loop_time + HeartbeatInterval end |
#install_oneshot_timer(interval) ⇒ Object
– Replaced original implementation 05Dec07, was way too slow because of the sort.
283 284 285 286 287 288 289 |
# File 'lib/pr_eventmachine.rb', line 283 def install_oneshot_timer interval uuid = UuidGenerator::generate #@timers << [Time.now + interval, uuid] #@timers.sort! {|a,b| a.first <=> b.first} @timers.add([Time.now + interval, uuid]) uuid end |
#open_loopbreaker ⇒ Object
385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 |
# File 'lib/pr_eventmachine.rb', line 385 def open_loopbreaker # Can't use an IO.pipe because they can't be set nonselectable in Windows. # Pick a random localhost UDP port. #@loopbreak_writer.close if @loopbreak_writer #rd,@loopbreak_writer = IO.pipe @loopbreak_reader = UDPSocket.new @loopbreak_writer = UDPSocket.new bound = false 100.times { @loopbreak_port = rand(10000) + 40000 begin @loopbreak_reader.bind "localhost", @loopbreak_port bound = true break rescue end } raise "Unable to bind Loopbreaker" unless bound LoopbreakReader.new(@loopbreak_reader) end |
#run ⇒ Object
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 |
# File 'lib/pr_eventmachine.rb', line 311 def run raise Error.new( "already running" ) if @running @running = true begin open_loopbreaker loop { @current_loop_time = Time.now break if @stop_scheduled run_timers break if @stop_scheduled crank_selectables break if @stop_scheduled run_heartbeats } ensure close_loopbreaker @selectables.each {|k, io| io.close} @selectables.clear @running = false end end |
#run_heartbeats ⇒ Object
353 354 355 356 357 358 |
# File 'lib/pr_eventmachine.rb', line 353 def run_heartbeats if @next_heartbeat <= @current_loop_time @next_heartbeat = @current_loop_time + HeartbeatInterval @selectables.each {|k,io| io.heartbeat} end end |
#run_timers ⇒ Object
338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/pr_eventmachine.rb', line 338 def run_timers @timers.each {|t| if t.first <= @current_loop_time @timers.delete t EventMachine::event_callback "", TimerFired, t.last else break end } #while @timers.length > 0 and @timers.first.first <= now # t = @timers.shift # EventMachine::event_callback "", TimerFired, t.last #end end |
#set_timer_quantum(interval_in_seconds) ⇒ Object
416 417 418 |
# File 'lib/pr_eventmachine.rb', line 416 def set_timer_quantum interval_in_seconds @timer_quantum = interval_in_seconds end |
#signal_loopbreak ⇒ Object
411 412 413 414 |
# File 'lib/pr_eventmachine.rb', line 411 def signal_loopbreak #@loopbreak_writer.write '+' if @loopbreak_writer @loopbreak_writer.send('+',0,"localhost",@loopbreak_port) if @loopbreak_writer end |
#stop ⇒ Object
#stop
380 381 382 383 |
# File 'lib/pr_eventmachine.rb', line 380 def stop raise Error.new( "not running") unless @running @stop_scheduled = true end |