Module: Minx

Extended by:
Utils
Defined in:
lib/minx.rb,
lib/minx/utils.rb,
lib/minx/channel.rb,
lib/minx/process.rb,
lib/minx/scheduler.rb,
lib/minx/io_channel.rb,
lib/minx/file_channel.rb,
lib/minx/socket_channel.rb

Defined Under Namespace

Modules: Utils Classes: Channel, FileChannel, IOChannel, Process, Scheduler, SocketChannel

Constant Summary

ROOT =

The root fiber.

Fiber.current
SCHEDULER =
Scheduler.new
ChannelError =
Class.new(Exception)
ProcessError =
Class.new(Exception)
@@abort_on_exception =
false

Class Method Summary (collapse)

Methods included from Utils

filter, map, producer

Class Method Details

+ (Object) abort_on_exception



30
31
32
# File 'lib/minx.rb', line 30

def self.abort_on_exception
  @@abort_on_exception
end

+ (Object) abort_on_exception=(val)



34
35
36
# File 'lib/minx.rb', line 34

def self.abort_on_exception=(val)
  @@abort_on_exception = val
end

+ (Channel) channel

Create a new channel.

Returns:



72
73
74
# File 'lib/minx.rb', line 72

def self.channel
  Channel.new
end

+ (nil) debug=(debug)

Set whether or not to enable debugging output.

Debugging information will be written to $stderr.

Returns:

  • (nil)


43
44
45
46
47
# File 'lib/minx.rb', line 43

def self.debug=(debug)
  @debug = debug

  return nil
end

+ (Boolean) debug?

Whether or not debugging is enabled.

Returns:

  • (Boolean)

    whether or not debugging is enabled

See Also:



53
54
55
# File 'lib/minx.rb', line 53

def self.debug?
  @debug
end

+ (nil) join(*processes)

Wait for the specified processes to finish.

The current process will be resumed after all the specified processes have terminated.

Examples:

Waiting for a pair of processes

p1 = Minx.spawn { @foo = chan1.read }
p2 = Minx.spawn { @bar = chan2.read }

Minx.join(p1, p2)

# Both @foo and @bar are available now.
puts @foo, @bar

Returns:

  • (nil)


100
101
102
# File 'lib/minx.rb', line 100

def self.join(*processes)
  Minx.yield until processes.all? {|p| p.finished? }
end

+ (nil) push(message, *choices)

Write a message simultaneously to one of a set channels.

The first channel that is ready to consume a message will be chosen. If more than one is ready, the first specified in the argument list is chosen.

Parameters:

  • message

    the message to be transmitted

Returns:

  • (nil)


112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/minx.rb', line 112

def self.push(message, *choices)
  choices.each do |channel|
    return channel.write(message) if channel.writable?
  end

  current = Fiber.current
  callback = Fiber.new do |reader|
    Fiber.yield(message)
    SCHEDULER.enqueue(current)
  end

  choices.each do |choice|
    choice.write_async(callback)
  end

  Fiber.yield
end

+ (Object) read(*channels)



183
184
185
186
187
188
189
190
191
192
193
# File 'lib/minx.rb', line 183

def self.read(*channels)
  results = []

  ps = channels.each_with_index.map do |chan, i|
    Minx.spawn { results[i] = chan.read }
  end

  Minx.join(*ps)

  return results
end

+ (Boolean) root?

Whether this is the root process.

Returns:

  • (Boolean)


26
27
28
# File 'lib/minx.rb', line 26

def self.root?
  ROOT == Fiber.current
end

+ (Object) select(*choices)

Simultaneously read from multiple channels.

The channels will be enumerated in order; the first one carrying a message will be picked, and the message will be returned.

If none of the channels are readable, the calling process will yield until a channel is written to, unless :skip => true is passed as an option, in which case the call will just return nil.

Examples:

Non-blocking select

Minx.read(chan1, chan2, :skip => true)

Parameters:

  • choices (Channel)

    the channels to be selected among

Returns:

  • the first message read from any of the channels



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/minx.rb', line 157

def self.select(*choices)
  options = choices.last.is_a?(Hash) ? choices.pop : {}

  # If a choice is readable, just read from that one.
  choices.each do |choice|
    return choice.read if choice.readable?
  end

  # Return immediately if :skip => true
  return if options[:skip]

  # ... otherwise, wait for a channel to become readable.
  current = Fiber.current

  callback = Fiber.new do |writer|
    message = writer.transfer
    current.resume(message)
  end

  choices.each do |choice|
    choice.read_async(callback)
  end

  Fiber.yield
end

+ (Process) spawn(&block)

Spawn a new process.

The spawned process will start immediately, taking over execution.

Returns:

Raises:

  • (ArgumentError)

    unless a block is given

See Also:



64
65
66
67
# File 'lib/minx.rb', line 64

def self.spawn(&block)
  SCHEDULER.enqueue(Fiber.current)
  Process.new(&block).spawn
end

+ (Object) supervise(*processes)



195
196
197
198
199
200
201
202
203
204
205
# File 'lib/minx.rb', line 195

def self.supervise(*processes)
  processes.each {|p| p.supervise }

  until processes.all? {|p| p.finished? }
    if root?
      yield SCHEDULER.main
    else
      yield Fiber.yield
    end
  end
end

+ (Object) write(*choices)



130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/minx.rb', line 130

def self.write(*choices)
  # Allows for both :msg => channel as well as [:msg, channel].
  if choices.size == 1 && choices.first.is_a?(Hash)
    choices = choices.first
  end

  processes = choices.map do |message, channels|
    Minx.spawn { Minx.push(message, *channels) }
  end

  Minx.join(*processes)
end

+ (nil) yield

Yield control to another process.

The current process will be resumed at a later point.

Returns:

  • (nil)


81
82
83
# File 'lib/minx.rb', line 81

def self.yield
  SCHEDULER.yield
end