Class: SimpleFuture
- Inherits:
-
Object
- Object
- SimpleFuture
- Defined in:
- lib/simple-future.rb
Overview
A container holding the (eventual) result of a forked child process once that process finishes. The child process executes the code block that must be passed to the constructor:
sf = SimpleFuture.new { do_slow_thing }
... do stuff ...
use(sf.value)
The code block must return a value that can be encoded by
Marshal
and must not exit prematurely.
Exceptions thrown inside the block will trigger a
SimpleFuture::ChildError
in the parent process but that exception
will contain the original in its cause
field.
Defined Under Namespace
Classes: ChildError, Error, ResultTypeError
Class Method Summary collapse
-
.all_done? ⇒ Boolean
Test if all instances created so far have run to completion.
-
.max_tasks ⇒ Object
Return the maximum number of concurrent child processes allowed.
-
.max_tasks=(value) ⇒ Object
Set the maximum number of concurrent child processes allowed.
-
.wait_for_all ⇒ Object
Wait until all child processes have run to completion and recover their results.
Instance Method Summary collapse
-
#check_if_ready ⇒ Boolean
Check if the child process has finished evaluating the block and has a result ready.
-
#complete? ⇒ Boolean
Test if the child process has finished and its result is available.
-
#initialize(&action) ⇒ SimpleFuture
constructor
In addition to creating a new
SimpleFuture
, the constructor creates a child process and evaluatesaction
in it. -
#value ⇒ Object
Return the result of the child process, blocking if it is not yet available.
-
#wait ⇒ Object
Block until the child process finishes, recover its result and clean up the process.
Constructor Details
#initialize(&action) ⇒ SimpleFuture
In addition to creating a new SimpleFuture
, the constructor
creates a child process and evaluates action
in it. If the
maximum number of child processes would be exceeded, it will block
until a process finishes.
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/simple-future.rb', line 88 def initialize(&action) @readPipe = nil @pid = nil @complete = false @result = nil self.class.all_done? # Reclaim all completed children block_until_clear() launch(action) end |
Class Method Details
.all_done? ⇒ Boolean
Test if all instances created so far have run to completion. As a
side effect, it will also call wait
on instances whose child
processes are running but have finished (i.e. their
check_if_ready
would return true.) This lets you use it as a
non-blocking way to clean up the remaining children.
206 207 208 209 |
# File 'lib/simple-future.rb', line 206 def self.all_done? @@in_progress.select!{ |sp| !sp.check_if_ready } return @@in_progress.size == 0 end |
.max_tasks ⇒ Object
Return the maximum number of concurrent child processes allowed.
190 |
# File 'lib/simple-future.rb', line 190 def self.max_tasks() return @@max_tasks; end |
.max_tasks=(value) ⇒ Object
Set the maximum number of concurrent child processes allowed. If set to less than 1, it is interpreted as meaning no limit.
It is initially set to the number of available cores as provided
by the Etc
module.
197 198 199 |
# File 'lib/simple-future.rb', line 197 def self.max_tasks=(value) @@max_tasks = value end |
.wait_for_all ⇒ Object
Wait until all child processes have run to completion and recover
their results. Programs should call this before exiting if there
is a chance that an instance was created without having wait
called on it.
215 216 217 218 219 |
# File 'lib/simple-future.rb', line 215 def self.wait_for_all @@in_progress.each{|sp| sp.wait} @@in_progress = [] return end |
Instance Method Details
#check_if_ready ⇒ Boolean
Check if the child process has finished evaluating the block and
has a result ready. If check_if_ready
returns true
, wait
will not block when called.
Note: check_if_ready
tests if there's data on the pipe to the
child process to see if it has finished. A sufficiently evil
child block might be able to cause a true result while still
blocking wait
.
Don't do that.
181 182 183 184 185 186 |
# File 'lib/simple-future.rb', line 181 def check_if_ready return true if complete? return false unless @readPipe.ready? wait return true end |
#complete? ⇒ Boolean
Test if the child process has finished and its result is available.
Note that this will only be true after a call to wait
(i.e. the
child process finished and its result has been retrieved.) If
you want to see if the result is (probably) available, use
check_if_ready
.
106 |
# File 'lib/simple-future.rb', line 106 def complete?() return @complete; end |
#value ⇒ Object
Return the result of the child process, blocking if it is not yet
available. Blocking is done by calling wait
, so the process
will be cleaned up.
111 112 113 114 |
# File 'lib/simple-future.rb', line 111 def value wait return @result end |
#wait ⇒ Object
Block until the child process finishes, recover its result and
clean up the process. wait
must be called for each
SimpleFuture
to prevent zombie processes. In practice, this is
rarely a problem since value
calls wait
and you usually want
to get all of the values. See wait_for_all
.
It is safe to call wait
multiple times on a SimpleFuture
.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/simple-future.rb', line 127 def wait # Quit if the child has already exited return if complete? # Read the contents; this may block data = @readPipe.read # Reap the child process; this shouldn't block for long Process.wait(@pid) # And now we're complete, regardless of what happens next. (We # set it early so that errors later on won't allow waiting again # and associated mystery errors.) @complete = true # Close and discard the pipe; we're done with it @readPipe.close @readPipe = nil # If the child process exited badly, this is an error raise Error.new("Error in child process #{@pid}!") unless $?.exitstatus == 0 && !data.empty? # Decode the result. If it's an exception object, that's the # error that was thrown in the child and that means an error here # as well. rbox = Marshal.load(data) raise rbox if rbox.is_a? ResultTypeError raise ChildError.new("Child process failed with an exception.", rbox) if rbox.is_a? Exception # Ensure rbox is a ResultContainer. This *probably* can't happen. raise Error.new("Invalid result object type: #{rbox.class}") unless rbox.is_a? ResultContainer # Aaaaaand, retrieve the value. @result = rbox.value return # return nil end |