Class: IO::Event::Selector::Select
- Inherits:
-
Object
- Object
- IO::Event::Selector::Select
- Defined in:
- lib/io/event/selector/select.rb
Defined Under Namespace
Constant Summary collapse
- EAGAIN =
-Errno::EAGAIN::Errno
- EWOULDBLOCK =
-Errno::EWOULDBLOCK::Errno
Instance Attribute Summary collapse
-
#loop ⇒ Object
readonly
Returns the value of attribute loop.
Instance Method Summary collapse
- #again?(errno) ⇒ Boolean
- #blocking(&block) ⇒ Object
- #close ⇒ Object
-
#initialize(loop) ⇒ Select
constructor
A new instance of Select.
-
#io_read(fiber, _io, buffer, length, offset = 0) ⇒ Object
Ruby <= 3.1, limited IO::Buffer support.
- #io_select(readable, writable, priority, timeout) ⇒ Object
- #io_wait(fiber, io, events) ⇒ Object
- #io_write(fiber, _io, buffer, length, offset = 0) ⇒ Object
- #process_wait(fiber, pid, flags) ⇒ Object
-
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
-
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception.
- #ready? ⇒ Boolean
-
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber.
- #select(duration = nil) ⇒ Object
-
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
-
#wakeup ⇒ Object
If the event loop is currently sleeping, wake it up.
-
#yield ⇒ Object
Yield from the current fiber back to the event loop.
Constructor Details
#initialize(loop) ⇒ Select
Returns a new instance of Select.
13 14 15 16 17 18 19 20 21 22 |
# File 'lib/io/event/selector/select.rb', line 13 def initialize(loop) @loop = loop @waiting = Hash.new.compare_by_identity @blocked = false @ready = Queue.new @interrupt = Interrupt.attach(self) end |
Instance Attribute Details
#loop ⇒ Object (readonly)
Returns the value of attribute loop.
24 25 26 |
# File 'lib/io/event/selector/select.rb', line 24 def loop @loop end |
Instance Method Details
#again?(errno) ⇒ Boolean
159 160 161 |
# File 'lib/io/event/selector/select.rb', line 159 def again?(errno) errno == EAGAIN or errno == EWOULDBLOCK end |
#blocking(&block) ⇒ Object
364 365 366 367 |
# File 'lib/io/event/selector/select.rb', line 364 def blocking(&block) fiber = Fiber.new(blocking: true, &block) return fiber.resume(fiber) end |
#close ⇒ Object
37 38 39 40 41 42 |
# File 'lib/io/event/selector/select.rb', line 37 def close @interrupt.close @loop = nil @waiting = nil end |
#io_read(fiber, _io, buffer, length, offset = 0) ⇒ Object
Ruby <= 3.1, limited IO::Buffer support.
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/io/event/selector/select.rb', line 168 def io_read(fiber, io, buffer, length, offset = 0) total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.read(io, 0, offset)} if result < 0 if again?(result) self.io_wait(fiber, io, IO::READABLE) else return result end elsif result == 0 break else total += result break if total >= length offset += result end end end return total end |
#io_select(readable, writable, priority, timeout) ⇒ Object
150 151 152 153 154 |
# File 'lib/io/event/selector/select.rb', line 150 def io_select(readable, writable, priority, timeout) Thread.new do IO.select(readable, writable, priority, timeout) end.value end |
#io_wait(fiber, io, events) ⇒ Object
142 143 144 145 146 147 148 |
# File 'lib/io/event/selector/select.rb', line 142 def io_wait(fiber, io, events) waiter = @waiting[io] = Waiter.new(fiber, events, @waiting[io]) @loop.transfer ensure waiter&.invalidate end |
#io_write(fiber, _io, buffer, length, offset = 0) ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/io/event/selector/select.rb', line 196 def io_write(fiber, io, buffer, length, offset = 0) total = 0 Selector.nonblock(io) do while true result = Fiber.blocking{buffer.write(io, 0, offset)} if result < 0 if again?(result) self.io_wait(fiber, io, IO::READABLE) else return result end elsif result == 0 break result else total += result break if total >= length offset += result end end end return total end |
#process_wait(fiber, pid, flags) ⇒ Object
370 371 372 373 374 |
# File 'lib/io/event/selector/select.rb', line 370 def process_wait(fiber, pid, flags) Thread.new do Process::Status.wait(pid, flags) end.value end |
#push(fiber) ⇒ Object
Append the given fiber into the ready list.
84 85 86 |
# File 'lib/io/event/selector/select.rb', line 84 def push(fiber) @ready.push(fiber) end |
#raise(fiber, *arguments) ⇒ Object
Transfer to the given fiber and raise an exception. Put the current fiber into the ready list.
89 90 91 92 93 94 95 96 |
# File 'lib/io/event/selector/select.rb', line 89 def raise(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.raise(*arguments) ensure optional.nullify end |
#ready? ⇒ Boolean
98 99 100 |
# File 'lib/io/event/selector/select.rb', line 98 def ready? !@ready.empty? end |
#resume(fiber, *arguments) ⇒ Object
Transfer from the current fiber to the specified fiber. Put the current fiber into the ready list.
64 65 66 67 68 69 70 71 |
# File 'lib/io/event/selector/select.rb', line 64 def resume(fiber, *arguments) optional = Optional.new(Fiber.current) @ready.push(optional) fiber.transfer(*arguments) ensure optional.nullify end |
#select(duration = nil) ⇒ Object
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 |
# File 'lib/io/event/selector/select.rb', line 389 def select(duration = nil) if pop_ready # If we have popped items from the ready list, they may influence the duration calculation, so we don't delay the event loop: duration = 0 end readable = Array.new writable = Array.new priority = Array.new @waiting.each do |io, waiter| waiter.each do |fiber, events| if (events & IO::READABLE) > 0 readable << io end if (events & IO::WRITABLE) > 0 writable << io end if (events & IO::PRIORITY) > 0 priority << io end end end duration = 0 unless @ready.empty? error = nil # We need to handle interrupts on blocking IO. Every other implementation uses EINTR, but that doesn't work with `::IO.select` as it will retry the call on EINTR. Thread.handle_interrupt(::Exception => :on_blocking) do @blocked = true readable, writable, priority = ::IO.select(readable, writable, priority, duration) rescue ::Exception => error # Requeue below... ensure @blocked = false end if error # Requeue the error into the pending exception queue: Thread.current.raise(error) return 0 end ready = Hash.new(0).compare_by_identity readable&.each do |io| ready[io] |= IO::READABLE end writable&.each do |io| ready[io] |= IO::WRITABLE end priority&.each do |io| ready[io] |= IO::PRIORITY end ready.each do |io, events| @waiting.delete(io).dispatch(events) do |waiter| # Re-schedule the waiting IO: waiter.tail = @waiting[io] @waiting[io] = waiter end end return ready.size end |
#transfer ⇒ Object
Transfer from the current fiber to the event loop.
59 60 61 |
# File 'lib/io/event/selector/select.rb', line 59 def transfer @loop.transfer end |
#wakeup ⇒ Object
If the event loop is currently sleeping, wake it up.
27 28 29 30 31 32 33 34 35 |
# File 'lib/io/event/selector/select.rb', line 27 def wakeup if @blocked @interrupt.signal return true end return false end |