Class: Minx::Channel
- Inherits:
-
Object
- Object
- Minx::Channel
- Defined in:
- lib/minx/channel.rb
Overview
A Channel is used to transmit messages between processes in a synchronized manner.
Instance Method Summary (collapse)
-
- (Channel) <<(message)
Write a message to the channel.
-
- (nil) each {|message| ... }
Enumerate over the messages sent to the channel.
-
- (Channel) initialize
constructor
Create a new channel.
-
- (Object) read
Read a message off the channel.
- - (Object) read_async(callback)
-
- (Boolean) readable?
Whether there are any processes waiting to write.
- - (Boolean) writable?
-
- (nil) write(message)
Write a message to the channel.
- - (Object) write_async(callback)
Constructor Details
- (Channel) initialize
Create a new channel.
9 10 11 12 |
# File 'lib/minx/channel.rb', line 9 def initialize @readers = [] @writers = [] end |
Instance Method Details
- (Channel) <<(message)
Write a message to the channel.
Exactly the same as calling #write, except that the channel itself is returned, allowing for chained calls, e.g.
chan << 1 << 2 << 3
55 56 57 58 |
# File 'lib/minx/channel.rb', line 55 def << write() return self end |
- (nil) each {|message| ... }
Enumerate over the messages sent to the channel.
111 112 113 |
# File 'lib/minx/channel.rb', line 111 def each yield read while true end |
- (Object) read
Read a message off the channel.
If no messages have been written to the channel, the calling process will block, only resuming when a write occurs. This behavior can be suppressed by calling read with :async => true, in which case the call will return immediately; the next time the calling process yields, it may be resumed with a message from the channel.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/minx/channel.rb', line 70 def read if @writers.empty? @readers << Fiber.current SCHEDULER.main while @writers.empty? writer = @writers.shift = writer.transfer else writer = @writers.shift until writer.alive? writer = @writers.shift return read if writer.nil? end = writer.transfer(Fiber.current) SCHEDULER.enqueue(Fiber.current) writer.transfer end return end |
- (Object) read_async(callback)
94 95 96 |
# File 'lib/minx/channel.rb', line 94 def read_async(callback) @readers << callback end |
- (Boolean) readable?
Whether there are any processes waiting to write.
If the channel is readable, the current process will not block when calling #read.
121 122 123 |
# File 'lib/minx/channel.rb', line 121 def readable? return !@writers.empty? end |
- (Boolean) writable?
125 126 127 |
# File 'lib/minx/channel.rb', line 125 def writable? return !@readers.empty? end |
- (nil) write(message)
Write a message to the channel.
If no readers are waiting, the calling process will block until one comes along.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/minx/channel.rb', line 22 def write() @writers << Fiber.current if @readers.empty? reader = SCHEDULER.main while reader.nil? else reader = @readers.shift until reader.alive? reader = @readers.shift return write() if reader.nil? end reader.transfer(Fiber.current) SCHEDULER.enqueue(Fiber.current) end reader.transfer() return nil end |
- (Object) write_async(callback)
98 99 100 |
# File 'lib/minx/channel.rb', line 98 def write_async(callback) @writers << callback end |