Module: Minx
- Extended by:
- Utils
- Defined in:
- lib/minx.rb,
lib/minx/utils.rb,
lib/minx/channel.rb,
lib/minx/process.rb,
lib/minx/scheduler.rb,
lib/minx/io_channel.rb,
lib/minx/file_channel.rb,
lib/minx/socket_channel.rb
Defined Under Namespace
Modules: Utils Classes: Channel, FileChannel, IOChannel, Process, Scheduler, SocketChannel
Constant Summary
- ROOT =
The root fiber.
Fiber.current
- SCHEDULER =
Scheduler.new
- ChannelError =
Class.new(Exception)
- ProcessError =
Class.new(Exception)
- @@abort_on_exception =
false
Class Method Summary (collapse)
- + (Object) abort_on_exception
- + (Object) abort_on_exception=(val)
-
+ (Channel) channel
Create a new channel.
-
+ (nil) debug=(debug)
Set whether or not to enable debugging output.
-
+ (Boolean) debug?
Whether or not debugging is enabled.
-
+ (nil) join(*processes)
Wait for the specified processes to finish.
-
+ (nil) push(message, *choices)
Write a message simultaneously to one of a set channels.
- + (Object) read(*channels)
-
+ (Boolean) root?
Whether this is the root process.
-
+ (Object) select(*choices)
Simultaneously read from multiple channels.
-
+ (Process) spawn(&block)
Spawn a new process.
- + (Object) supervise(*processes)
- + (Object) write(*choices)
-
+ (nil) yield
Yield control to another process.
Methods included from Utils
Class Method Details
+ (Object) abort_on_exception
30 31 32 |
# File 'lib/minx.rb', line 30 def self.abort_on_exception @@abort_on_exception end |
+ (Object) abort_on_exception=(val)
34 35 36 |
# File 'lib/minx.rb', line 34 def self.abort_on_exception=(val) @@abort_on_exception = val end |
+ (Channel) channel
Create a new channel.
72 73 74 |
# File 'lib/minx.rb', line 72 def self.channel Channel.new end |
+ (nil) debug=(debug)
Set whether or not to enable debugging output.
Debugging information will be written to $stderr.
43 44 45 46 47 |
# File 'lib/minx.rb', line 43 def self.debug=(debug) @debug = debug return nil end |
+ (Boolean) debug?
Whether or not debugging is enabled.
53 54 55 |
# File 'lib/minx.rb', line 53 def self.debug? @debug end |
+ (nil) join(*processes)
Wait for the specified processes to finish.
The current process will be resumed after all the specified processes have terminated.
100 101 102 |
# File 'lib/minx.rb', line 100 def self.join(*processes) Minx.yield until processes.all? {|p| p.finished? } end |
+ (nil) push(message, *choices)
Write a message simultaneously to one of a set channels.
The first channel that is ready to consume a message will be chosen. If more than one is ready, the first specified in the argument list is chosen.
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/minx.rb', line 112 def self.push(, *choices) choices.each do |channel| return channel.write() if channel.writable? end current = Fiber.current callback = Fiber.new do |reader| Fiber.yield() SCHEDULER.enqueue(current) end choices.each do |choice| choice.write_async(callback) end Fiber.yield end |
+ (Object) read(*channels)
183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/minx.rb', line 183 def self.read(*channels) results = [] ps = channels.each_with_index.map do |chan, i| Minx.spawn { results[i] = chan.read } end Minx.join(*ps) return results end |
+ (Boolean) root?
Whether this is the root process.
26 27 28 |
# File 'lib/minx.rb', line 26 def self.root? ROOT == Fiber.current end |
+ (Object) select(*choices)
Simultaneously read from multiple channels.
The channels will be enumerated in order; the first one carrying a message will be picked, and the message will be returned.
If none of the channels are readable, the calling process will yield until a channel is written to, unless :skip => true is passed as an option, in which case the call will just return nil.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/minx.rb', line 157 def self.select(*choices) = choices.last.is_a?(Hash) ? choices.pop : {} # If a choice is readable, just read from that one. choices.each do |choice| return choice.read if choice.readable? end # Return immediately if :skip => true return if [:skip] # ... otherwise, wait for a channel to become readable. current = Fiber.current callback = Fiber.new do |writer| = writer.transfer current.resume() end choices.each do |choice| choice.read_async(callback) end Fiber.yield end |
+ (Process) spawn(&block)
Spawn a new process.
The spawned process will start immediately, taking over execution.
64 65 66 67 |
# File 'lib/minx.rb', line 64 def self.spawn(&block) SCHEDULER.enqueue(Fiber.current) Process.new(&block).spawn end |
+ (Object) supervise(*processes)
195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/minx.rb', line 195 def self.supervise(*processes) processes.each {|p| p.supervise } until processes.all? {|p| p.finished? } if root? yield SCHEDULER.main else yield Fiber.yield end end end |
+ (Object) write(*choices)
130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/minx.rb', line 130 def self.write(*choices) # Allows for both :msg => channel as well as [:msg, channel]. if choices.size == 1 && choices.first.is_a?(Hash) choices = choices.first end processes = choices.map do |, channels| Minx.spawn { Minx.push(, *channels) } end Minx.join(*processes) end |
+ (nil) yield
Yield control to another process.
The current process will be resumed at a later point.
81 82 83 |
# File 'lib/minx.rb', line 81 def self.yield SCHEDULER.yield end |