Class: EventMachine::Reactor
- Inherits:
-
Object
- Object
- EventMachine::Reactor
- Includes:
- Singleton
- Defined in:
- lib/pr_eventmachine.rb
Instance Method Summary collapse
- #add_selectable(io) ⇒ Object
- #close_loopbreaker ⇒ Object
- #crank_selectables ⇒ Object
- #get_selectable(uuid) ⇒ Object
-
#initialize ⇒ Reactor
constructor
A new instance of Reactor.
-
#initialize_for_run ⇒ Object
Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.
- #install_oneshot_timer(interval) ⇒ Object
- #open_loopbreaker ⇒ Object
- #run ⇒ Object
- #run_timers ⇒ Object
- #set_timer_quantum(interval_in_seconds) ⇒ Object
- #signal_loopbreak ⇒ Object
-
#stop ⇒ Object
#stop.
Constructor Details
#initialize ⇒ Reactor
Returns a new instance of Reactor.
188 189 190 |
# File 'lib/pr_eventmachine.rb', line 188 def initialize initialize_for_run end |
Instance Method Details
#add_selectable(io) ⇒ Object
209 210 211 |
# File 'lib/pr_eventmachine.rb', line 209 def add_selectable io @selectables[io.uuid] = io end |
#close_loopbreaker ⇒ Object
275 276 277 278 |
# File 'lib/pr_eventmachine.rb', line 275 def close_loopbreaker @loopbreak_writer.close @loopbreak_writer = nil end |
#crank_selectables ⇒ Object
244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/pr_eventmachine.rb', line 244 def crank_selectables #$stderr.write 'R' readers = @selectables.values.select {|io| io.select_for_reading?} writers = @selectables.values.select {|io| io.select_for_writing?} s = select( readers, writers, nil, @timer_quantum) s and s[1] and s[1].each {|w| w.eventable_write } s and s[0] and s[0].each {|r| r.eventable_read } @selectables.delete_if {|k,io| if io.close_scheduled? io.close true end } end |
#get_selectable(uuid) ⇒ Object
213 214 215 |
# File 'lib/pr_eventmachine.rb', line 213 def get_selectable uuid @selectables[uuid] end |
#initialize_for_run ⇒ Object
Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.
201 202 203 204 205 206 207 |
# File 'lib/pr_eventmachine.rb', line 201 def initialize_for_run @running = false @stop_scheduled = false @selectables ||= {}; @selectables.clear @timers = [] set_timer_quantum(0.5) end |
#install_oneshot_timer(interval) ⇒ Object
192 193 194 195 196 197 |
# File 'lib/pr_eventmachine.rb', line 192 def install_oneshot_timer interval uuid = UuidGenerator::generate @timers << [Time.now + interval, uuid] @timers.sort! {|a,b| a.first <=> b.first} uuid end |
#open_loopbreaker ⇒ Object
269 270 271 272 273 |
# File 'lib/pr_eventmachine.rb', line 269 def open_loopbreaker @loopbreak_writer.close if @loopbreak_writer rd,@loopbreak_writer = IO.pipe LoopbreakReader.new rd end |
#run ⇒ Object
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/pr_eventmachine.rb', line 217 def run raise Error.new( "already running" ) if @running @running = true open_loopbreaker loop { break if @stop_scheduled run_timers break if @stop_scheduled crank_selectables } close_loopbreaker @selectables.each {|k, io| io.close} @selectables.clear @running = false end |
#run_timers ⇒ Object
236 237 238 239 240 241 242 |
# File 'lib/pr_eventmachine.rb', line 236 def run_timers now = Time.now while @timers.length > 0 and @timers.first.first <= now t = @timers.shift EventMachine::event_callback "", TimerFired, t.last end end |
#set_timer_quantum(interval_in_seconds) ⇒ Object
284 285 286 |
# File 'lib/pr_eventmachine.rb', line 284 def set_timer_quantum interval_in_seconds @timer_quantum = interval_in_seconds end |
#signal_loopbreak ⇒ Object
280 281 282 |
# File 'lib/pr_eventmachine.rb', line 280 def signal_loopbreak @loopbreak_writer.write '+' if @loopbreak_writer end |
#stop ⇒ Object
#stop
264 265 266 267 |
# File 'lib/pr_eventmachine.rb', line 264 def stop raise Error.new( "not running") unless @running @stop_scheduled = true end |