Class: SimpleFuture

Inherits:
Object
  • Object
show all
Defined in:
lib/simple-future.rb

Overview

A container holding the (eventual) result of a forked child process once that process finishes. The child process executes the code block that must be passed to the constructor:

  sf = SimpleFuture.new { do_slow_thing }
  ... do stuff ...
  use(sf.value)

The code block must return a value that can be encoded by Marshal and must not exit prematurely.

Exceptions thrown inside the block will trigger a SimpleFuture::ChildError in the parent process but that exception will contain the original in its cause field.

Defined Under Namespace

Classes: ChildError, Error, ResultTypeError

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&action) ⇒ SimpleFuture

In addition to creating a new SimpleFuture, the constructor creates a child process and evaluates action in it. If the maximum number of child processes would be exceeded, it will block until a process finishes.



88
89
90
91
92
93
94
95
96
97
# File 'lib/simple-future.rb', line 88

def initialize(&action)
  @readPipe = nil
  @pid = nil
  @complete = false
  @result = nil

  self.class.all_done?       # Reclaim all completed children
  block_until_clear()
  launch(action)
end

Class Method Details

.all_done?Boolean

Test if all instances created so far have run to completion. As a side effect, it will also call wait on instances whose child processes are running but have finished (i.e. their check_if_ready would return true.) This lets you use it as a non-blocking way to clean up the remaining children.

Returns:

  • (Boolean)


206
207
208
209
# File 'lib/simple-future.rb', line 206

def self.all_done?
  @@in_progress.select!{ |sp| !sp.check_if_ready }
  return @@in_progress.size == 0
end

.max_tasksObject

Return the maximum number of concurrent child processes allowed.



190
# File 'lib/simple-future.rb', line 190

def self.max_tasks()     return @@max_tasks; end

.max_tasks=(value) ⇒ Object

Set the maximum number of concurrent child processes allowed. If set to less than 1, it is interpreted as meaning no limit.

It is initially set to the number of available cores as provided by the Etc module.



197
198
199
# File 'lib/simple-future.rb', line 197

def self.max_tasks=(value)
  @@max_tasks = value
end

.wait_for_allObject

Wait until all child processes have run to completion and recover their results. Programs should call this before exiting if there is a chance that an instance was created without having wait called on it.



215
216
217
218
219
# File 'lib/simple-future.rb', line 215

def self.wait_for_all
  @@in_progress.each{|sp| sp.wait}
  @@in_progress = []
  return
end

Instance Method Details

#check_if_readyBoolean

Check if the child process has finished evaluating the block and has a result ready. If check_if_ready returns true, wait will not block when called.

Note: check_if_ready tests if there's data on the pipe to the child process to see if it has finished. A sufficiently evil child block might be able to cause a true result while still blocking wait.

Don't do that.

Returns:

  • (Boolean)


181
182
183
184
185
186
# File 'lib/simple-future.rb', line 181

def check_if_ready
  return true if complete?
  return false unless @readPipe.ready?
  wait
  return true
end

#complete?Boolean

Test if the child process has finished and its result is available.

Note that this will only be true after a call to wait (i.e. the child process finished and its result has been retrieved.) If you want to see if the result is (probably) available, use check_if_ready.

Returns:

  • (Boolean)


106
# File 'lib/simple-future.rb', line 106

def complete?()   return @complete; end

#valueObject

Return the result of the child process, blocking if it is not yet available. Blocking is done by calling wait, so the process will be cleaned up.



111
112
113
114
# File 'lib/simple-future.rb', line 111

def value
  wait
  return @result
end

#waitObject

Block until the child process finishes, recover its result and clean up the process. wait must be called for each SimpleFuture to prevent zombie processes. In practice, this is rarely a problem since value calls wait and you usually want to get all of the values. See wait_for_all.

It is safe to call wait multiple times on a SimpleFuture.

Raises:

  • (ChildError)

    The child process raised an uncaught exception.

  • (ResultTypeError)

    Marshal cannot encode the result

  • (Error)

    An error occurred in the IPC system or child process.



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/simple-future.rb', line 127

def wait
  # Quit if the child has already exited    
  return if complete?

  # Read the contents; this may block
  data = @readPipe.read

  # Reap the child process; this shouldn't block for long
  Process.wait(@pid)

  # And now we're complete, regardless of what happens next.  (We
  # set it early so that errors later on won't allow waiting again
  # and associated mystery errors.)
  @complete = true

  # Close and discard the pipe; we're done with it
  @readPipe.close
  @readPipe = nil

  # If the child process exited badly, this is an error
  raise Error.new("Error in child process #{@pid}!") unless
    $?.exitstatus == 0 && !data.empty?

  # Decode the result.  If it's an exception object, that's the
  # error that was thrown in the child and that means an error here
  # as well.
  rbox = Marshal.load(data)
  raise rbox if rbox.is_a? ResultTypeError
  raise ChildError.new("Child process failed with an exception.", rbox) if
    rbox.is_a? Exception

  # Ensure rbox is a ResultContainer. This *probably* can't happen.
  raise Error.new("Invalid result object type: #{rbox.class}") unless
    rbox.is_a? ResultContainer    

  # Aaaaaand, retrieve the value.
  @result = rbox.value
  
  return      # return nil
end