Class: Concurrent::Future

Inherits:
IVar
  • Object
show all
Defined in:
lib/concurrent/future.rb

Overview

A ‘Future` represents a promise to complete an action at some time in the future. The action is atomic and permanent. The idea behind a future is to send an operation for asynchronous completion, do other stuff, then return and retrieve the result of the async operation at a later time.

A ‘Future` has four possible states: :unscheduled, :pending, :rejected, or :fulfilled. When a `Future` is created its state is set to :unscheduled. Once the `#execute` method is called the state becomes :pending and will remain in that state until processing is complete. A completed `Future` is either :rejected, indicating that an exception was thrown during processing, or :fulfilled, indicating success. If a `Future` is :fulfilled its `value` will be updated to reflect the result of the operation. If :rejected the `reason` will be updated with a reference to the thrown exception. The predicate methods `#unscheduled?`, `#pending?`, `#rejected?`, and `fulfilled?` can be called at any time to obtain the state of the `Future`, as can the `#state` method, which returns a symbol.

Retrieving the value of a ‘Future` is done through the `#value` (alias: `#deref`) method. Obtaining the value of a `Future` is a potentially blocking operation. When a `Future` is :rejected a call to `#value` will return `nil` immediately. When a `Future` is :fulfilled a call to `#value` will immediately return the current value. When a `Future` is :pending a call to `#value` will block until the `Future` is either :rejected or :fulfilled. A timeout value can be passed to `#value` to limit how long the call will block. If `nil` the call will block indefinitely. If `0` the call will not block. Any other integer or float value will indicate the maximum number of seconds to block.

The ‘Future` class also includes the behavior of the Ruby standard library `Observable` module, but does so in a thread-safe way. On fulfillment or rejection all observers will be notified according to the normal `Observable` behavior. The observer callback function will be called with three parameters: the `Time` of fulfillment/rejection, the final `value`, and the final `reason`. Observers added after fulfillment/rejection will still be notified as normal.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from IVar

#add_observer, #fail, #set

Methods included from Observable

#add_observer, #count_observers, #delete_observer, #delete_observers, #with_observer

Methods included from Obligation

#completed?, #exception, #fulfilled?, #incomplete?, #no_error!, #pending?, #reason, #rejected?, #state, #unscheduled?, #value, #value!, #wait

Methods included from Dereferenceable

#value

Constructor Details

#initialize(opts = {}) { ... } ⇒ Future

Create a new ‘Future` in the `:unscheduled` state.

Parameters:

  • opts (Hash) (defaults to: {})

    the options controlling how the future will be processed

Options Hash (opts):

  • :operation (Boolean) — default: false

    when ‘true` will execute the future on the global operation pool (for long-running operations), when `false` will execute the future on the global task pool (for short-running tasks)

  • :executor (object)

    when provided will run all operations on this executor rather than the global thread pool (overrides :operation)

  • :dup_on_deref (String) — default: false

    call ‘#dup` before returning the data

  • :freeze_on_deref (String) — default: false

    call ‘#freeze` before returning the data

  • :copy_on_deref (String) — default: nil

    call the given ‘Proc` passing the internal value and returning the value returned from the proc

Yields:

  • the asynchronous operation to perform

Raises:

  • (ArgumentError)

    if no block is given



59
60
61
62
63
64
65
# File 'lib/concurrent/future.rb', line 59

def initialize(opts = {}, &block)
  raise ArgumentError.new('no block given') unless block_given?
  super(IVar::NO_VALUE, opts)
  @state = :unscheduled
  @task = block
  @executor = OptionsParser::get_executor_from(opts)
end

Class Method Details

.execute(opts = {}) { ... } ⇒ Future

Create a new ‘Future` object with the given block, execute it, and return the `:pending` object.

Examples:

future = Concurrent::Future.execute{ sleep(1); 42 }
future.state #=> :pending

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :dup_on_deref (String) — default: false

    call ‘#dup` before returning the data

  • :freeze_on_deref (String) — default: false

    call ‘#freeze` before returning the data

  • :copy_on_deref (String) — default: nil

    call the given ‘Proc` passing the internal value and returning the value returned from the proc

Yields:

  • the asynchronous operation to perform

Returns:

  • (Future)

    the newly created ‘Future` in the `:pending` state

Raises:

  • (ArgumentError)

    if no block is given

Since:

  • 0.5.0



110
111
112
# File 'lib/concurrent/future.rb', line 110

def self.execute(opts = {}, &block)
  Future.new(opts, &block).execute
end

Instance Method Details

#executeFuture

Execute an ‘:unscheduled` `Future`. Immediately sets the state to `:pending` and passes the block to a new thread/thread pool for eventual execution. Does nothing if the `Future` is in any state other than `:unscheduled`.

Examples:

Instance and execute in separate steps

future = Concurrent::Future.new{ sleep(1); 42 }
future.state #=> :unscheduled
future.execute
future.state #=> :pending

Instance and execute in one line

future = Concurrent::Future.new{ sleep(1); 42 }.execute
future.state #=> :pending

Returns:

  • (Future)

    a reference to ‘self`

Since:

  • 0.5.0



84
85
86
87
88
89
# File 'lib/concurrent/future.rb', line 84

def execute
  if compare_and_set_state(:pending, :unscheduled)
    @executor.post{ work }
    self
  end
end