Class: IO::Event::Selector::Select
- Inherits:
-
Object
- Object
- IO::Event::Selector::Select
- Defined in:
- lib/io/event/selector/select.rb
Defined Under Namespace
Constant Summary collapse
- EAGAIN =
-Errno::EAGAIN::Errno
- EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno
Instance Attribute Summary collapse
-
#idle_duration ⇒ Object
readonly
This is the amount of time the event loop was idle during the last select call.
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
Instance Method Summary collapse
- #again?(errno) ⇒ Boolean
- #blocking(&block) ⇒ Object
- #close ⇒ Object
-
#initialize(loop) ⇒ Select
constructor
A new instance of Select.
-
#io_read(fiber, _io, buffer, length, offset = 0) ⇒ Object
Ruby <= 3.1, limited IO::Buffer support.
- #io_select(readable, writable, priority, timeout) ⇒ Object
- #io_wait(fiber, io, events) ⇒ Object
- #io_write(fiber, _io, buffer, length, offset = 0) ⇒ Object
- #process_wait(fiber, pid, flags) ⇒ Object
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception.
- #ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
- #select(duration = nil) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#wakeup ⇒ Object
If the event loop is currently sleeping, wake it up.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Returns a new instance of Select.
13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'lib/io/event/selector/select.rb', line 13 def initialize(loop) @loop = loop @waiting = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) @idle_duration = 0.0 end |
Instance Attribute Details
#idle_duration ⇒ Object (readonly)
This is the amount of time the event loop was idle during the last select call.
29 30 31 |
# File 'lib/io/event/selector/select.rb', line 29 def idle_duration @idle_duration end |
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
26 27 28 |
# File 'lib/io/event/selector/select.rb', line 26 def loop @loop end |
Instance Method Details
#again?(errno) ⇒ Boolean
164 165 166 |
# File 'lib/io/event/selector/select.rb', line 164 def again?(errno) errno == EAGAIN or errno == EWOULDBLOCK end |
#blocking(&block) ⇒ Object
369 370 371 372 |
# File 'lib/io/event/selector/select.rb', line 369 def blocking(&block) fiber = Fiber.new(blocking: true, &block) return fiber.resume(fiber) end |
#close ⇒ Object
42 43 44 45 46 47 |
# File 'lib/io/event/selector/select.rb', line 42 def close @interrupt.close @loop = nil @waiting = nil end |
#io_read(fiber, _io, buffer, length, offset = 0) ⇒ Object
Ruby <= 3.1, limited IO::Buffer support.
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/io/event/selector/select.rb', line 173 def io_read(fiber, io, buffer, length, offset = 0) total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.read(io, 0, offset)} if result < 0 if again?(result) self.io_wait(fiber, io, IO::READABLE) else return result end elsif result == 0 break else total += result break if total >= length offset += result end end end return total end |
#io_select(readable, writable, priority, timeout) ⇒ Object
155 156 157 158 159 |
# File 'lib/io/event/selector/select.rb', line 155 def io_select(readable, writable, priority, timeout) Thread.new do IO.select(readable, writable, priority, timeout) end.value end |
#io_wait(fiber, io, events) ⇒ Object
147 148 149 150 151 152 153 |
# File 'lib/io/event/selector/select.rb', line 147 def io_wait(fiber, io, events) waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io]) @loop.transfer ensure waiter&.invalidate end |
#io_write(fiber, _io, buffer, length, offset = 0) ⇒ Object
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/io/event/selector/select.rb', line 201 def io_write(fiber, io, buffer, length, offset = 0) total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.write(io, 0, offset)} if result < 0 if again?(result) self.io_wait(fiber, io, IO::READABLE) else return result end elsif result == 0 break result else total += result break if total >= length offset += result end end end return total end |
#process_wait(fiber, pid, flags) ⇒ Object
375 376 377 378 379 |
# File 'lib/io/event/selector/select.rb', line 375 def process_wait(fiber, pid, flags) Thread.new do Process::Status.wait(pid, flags) end.value end |
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
89 90 91 |
# File 'lib/io/event/selector/select.rb', line 89 def push(fiber) @ready.push(fiber) end |
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
94 95 96 97 98 99 100 101 |
# File 'lib/io/event/selector/select.rb', line 94 def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end |
#ready? ⇒ Boolean
103 104 105 |
# File 'lib/io/event/selector/select.rb', line 103 def ready? !@ready.empty? end |
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
69 70 71 72 73 74 75 76 |
# File 'lib/io/event/selector/select.rb', line 69 def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end |
#select(duration = nil) ⇒ Object
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 |
# File 'lib/io/event/selector/select.rb', line 394 def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end readable = Array.new writable = Array.new priority = Array.new @waiting.each do |io, waiter| waiter.each do |fiber, events| if (events & IO::READABLE) > 0 readable << io end if (events & IO::WRITABLE) > 0 writable << io end if (events & IO::PRIORITY) > 0 priority << io end end end duration = 0 unless @ready.empty? error = nil if duration&.>(0) start_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) else @idle_duration = 0.0 end # We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR. Thread.handle_interrupt(::Exception => :on_blocking) do @blocked = true readable, writable, priority = ::IO.select(readable, writable, priority, duration) rescue ::Exception => error # Requeue below... ensure @blocked = false if start_time end_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) @idle_duration = end_time - start_time end end if error # Requeue the error into the pending exception queue: Thread.current.raise(error) return 0 end ready = Hash.new(0).compare_by_identity readable&.each do |io| ready[io] |= IO::READABLE end writable&.each do |io| ready[io] |= IO::WRITABLE end priority&.each do |io| ready[io] |= IO::PRIORITY end ready.each do |io, events| @waiting.delete(io).dispatch(events) do |waiter| # Re-schedule the waiting IO: waiter.tail = @waiting[io] @waiting[io] = waiter end end return ready.size end |
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
64 65 66 |
# File 'lib/io/event/selector/select.rb', line 64 def transfer @loop.transfer end |
#wakeup ⇒ Object
If the event loop is currently sleeping, wake it up.
32 33 34 35 36 37 38 39 40 |
# File 'lib/io/event/selector/select.rb', line 32 def wakeup if @blocked @interrupt.signal return true end return false end |