Class: EventMachine::Reactor

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/pr_eventmachine.rb

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



188
189
190
# File 'lib/pr_eventmachine.rb', line 188

def initialize
  initialize_for_run
end

Instance Method Details

#add_selectable(io) ⇒ Object



209
210
211
# File 'lib/pr_eventmachine.rb', line 209

def add_selectable io
  @selectables[io.uuid] = io
end

#close_loopbreakerObject



275
276
277
278
# File 'lib/pr_eventmachine.rb', line 275

def close_loopbreaker
  @loopbreak_writer.close
  @loopbreak_writer = nil
end

#crank_selectablesObject



244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/pr_eventmachine.rb', line 244

def crank_selectables
    #$stderr.write 'R'

    readers = @selectables.values.select {|io| io.select_for_reading?}
    writers = @selectables.values.select {|io| io.select_for_writing?}

    s = select( readers, writers, nil, @timer_quantum)

    s and s[1] and s[1].each {|w| w.eventable_write }
    s and s[0] and s[0].each {|r| r.eventable_read }

    @selectables.delete_if {|k,io|
      if io.close_scheduled?
        io.close
        true
      end
    }
end

#get_selectable(uuid) ⇒ Object



213
214
215
# File 'lib/pr_eventmachine.rb', line 213

def get_selectable uuid
  @selectables[uuid]
end

#initialize_for_runObject

Called before run, this is a good place to clear out arrays with cruft that may be left over from a previous run.



201
202
203
204
205
206
207
# File 'lib/pr_eventmachine.rb', line 201

def initialize_for_run
  @running = false
  @stop_scheduled = false
  @selectables ||= {}; @selectables.clear
  @timers = []
  set_timer_quantum(0.5)
end

#install_oneshot_timer(interval) ⇒ Object



192
193
194
195
196
197
# File 'lib/pr_eventmachine.rb', line 192

def install_oneshot_timer interval
  uuid = UuidGenerator::generate
  @timers << [Time.now + interval, uuid]
  @timers.sort! {|a,b| a.first <=> b.first}
  uuid
end

#open_loopbreakerObject



269
270
271
272
273
# File 'lib/pr_eventmachine.rb', line 269

def open_loopbreaker
  @loopbreak_writer.close if @loopbreak_writer
  rd,@loopbreak_writer = IO.pipe
  LoopbreakReader.new rd
end

#runObject

Raises:



217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/pr_eventmachine.rb', line 217

def run
  raise Error.new( "already running" ) if @running
  @running = true
  open_loopbreaker

  loop {
    break if @stop_scheduled
    run_timers
    break if @stop_scheduled
    crank_selectables
  }

  close_loopbreaker
  @selectables.each {|k, io| io.close}
  @selectables.clear

  @running = false
end

#run_timersObject



236
237
238
239
240
241
242
# File 'lib/pr_eventmachine.rb', line 236

def run_timers
  now = Time.now
  while @timers.length > 0 and @timers.first.first <= now
    t = @timers.shift
    EventMachine::event_callback "", TimerFired, t.last
  end
end

#set_timer_quantum(interval_in_seconds) ⇒ Object



284
285
286
# File 'lib/pr_eventmachine.rb', line 284

def set_timer_quantum interval_in_seconds
  @timer_quantum = interval_in_seconds
end

#signal_loopbreakObject



280
281
282
# File 'lib/pr_eventmachine.rb', line 280

def signal_loopbreak
  @loopbreak_writer.write '+' if @loopbreak_writer
end

#stopObject

#stop

Raises:



264
265
266
267
# File 'lib/pr_eventmachine.rb', line 264

def stop
  raise Error.new( "not running") unless @running
  @stop_scheduled = true
end