Class: Concurrent::Future

Inherits:
IVar
  • Object
show all
Defined in:
lib/concurrent/future.rb

Overview

## Copy Options

Object references in Ruby are mutable. This can lead to serious problems when the Concern::Obligation#value of an object is a mutable reference. Which is always the case unless the value is a Fixnum, Symbol, or similar “primative” data type. Each instance can be configured with a few options that can help protect the program from potentially dangerous operations. Each of these options can be optionally set when the oject instance is created:

  • :dup_on_deref When true the object will call the #dup method on the value object every time the #value methid is called (default: false)

  • :freeze_on_deref When true the object will call the #freeze method on the value object every time the #value method is called (default: false)

  • :copy_on_deref When given a Proc object the Proc will be run every time the #value method is called. The Proc will be given the current value as its only argument and the result returned by the block will be the return value of the #value call. When nil this option will be ignored (default: nil)

When multiple deref options are set the order of operations is strictly defined. The order of deref operations is:

  • :copy_on_deref

  • :dup_on_deref

  • :freeze_on_deref

Because of this ordering there is no need to #freeze an object created by a provided :copy_on_deref block. Simply set :freeze_on_deref to true. Setting both :dup_on_deref to true and :freeze_on_deref to true is as close to the behavior of a “pure” functional language (like Erlang, Clojure, or Haskell) as we are likely to get in Ruby.

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from IVar

#add_observer, #fail, #try_set

Methods included from Concern::Observable

#add_observer, #count_observers, #delete_observer, #delete_observers, #with_observer

Methods included from Concern::Obligation

#complete?, #exception, #fulfilled?, #incomplete?, #pending?, #reason, #rejected?, #state, #unscheduled?, #value, #value!, #wait, #wait!

Methods included from Concern::Dereferenceable

#value

Constructor Details

#initialize(opts = {}) { ... } ⇒ Future

Create a new Future in the :unscheduled state.

Parameters:

  • (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given Executor instance. Three special values are also supported: :task returns the global task pool, :operation returns the global operation pool, and :immediate returns a new ImmediateExecutor object.

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from Concern::Obligation#value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from Concern::Obligation#value

  • :copy_on_deref (Proc) — default: nil

    When calling the Concern::Obligation#value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

  • :args (object, Array)

    zero or more arguments to be passed the task block on execution

Yields:

  • the asynchronous operation to perform

Raises:

  • if no block is given



30
31
32
33
# File 'lib/concurrent/future.rb', line 30

def initialize(opts = {}, &block)
  raise ArgumentError.new('no block given') unless block_given?
  super(NULL, opts.merge(__task_from_block__: block), &nil)
end

Class Method Details

.execute(opts = {}) { ... } ⇒ Future

Create a new Future object with the given block, execute it, and return the :pending object.

Examples:

future = Concurrent::Future.execute{ sleep(1); 42 }
future.state #=> :pending

Parameters:

  • (defaults to: {})

    the options used to define the behavior at update and deref and to specify the executor on which to perform actions

Options Hash (opts):

  • :executor (Executor)

    when set use the given Executor instance. Three special values are also supported: :task returns the global task pool, :operation returns the global operation pool, and :immediate returns a new ImmediateExecutor object.

  • :dup_on_deref (Boolean) — default: false

    Call #dup before returning the data from Concern::Obligation#value

  • :freeze_on_deref (Boolean) — default: false

    Call #freeze before returning the data from Concern::Obligation#value

  • :copy_on_deref (Proc) — default: nil

    When calling the Concern::Obligation#value method, call the given proc passing the internal value as the sole argument then return the new value returned from the proc.

  • :args (object, Array)

    zero or more arguments to be passed the task block on execution

Yields:

  • the asynchronous operation to perform

Returns:

  • the newly created Future in the :pending state

Raises:

  • if no block is given



74
75
76
# File 'lib/concurrent/future.rb', line 74

def self.execute(opts = {}, &block)
  Future.new(opts, &block).execute
end

Instance Method Details

#cancelBoolean

Attempt to cancel the operation if it has not already processed. The operation can only be cancelled while still pending. It cannot be cancelled once it has begun processing or has completed.

Returns:

  • was the operation successfully cancelled.



96
97
98
99
100
101
102
103
# File 'lib/concurrent/future.rb', line 96

def cancel
  if compare_and_set_state(:cancelled, :pending)
    complete(false, nil, CancelledOperationError.new)
    true
  else
    false
  end
end

#cancelled?Boolean

Has the operation been successfully cancelled?

Returns:



108
109
110
# File 'lib/concurrent/future.rb', line 108

def cancelled?
  state == :cancelled
end

#executeFuture

Execute an :unscheduled Future. Immediately sets the state to :pending and passes the block to a new thread/thread pool for eventual execution. Does nothing if the Future is in any state other than :unscheduled.

Examples:

Instance and execute in separate steps

future = Concurrent::Future.new{ sleep(1); 42 }
future.state #=> :unscheduled
future.execute
future.state #=> :pending

Instance and execute in one line

future = Concurrent::Future.new{ sleep(1); 42 }.execute
future.state #=> :pending

Returns:

  • a reference to self



50
51
52
53
54
55
# File 'lib/concurrent/future.rb', line 50

def execute
  if compare_and_set_state(:pending, :unscheduled)
    @executor.post{ safe_execute(@task, @args) }
    self
  end
end

#set(value = NULL) { ... } ⇒ IVar

Set the IVar to a value and wake or notify all threads waiting on it.

Parameters:

  • (defaults to: NULL)

    the value to store in the IVar

Yields:

  • A block operation to use for setting the value

Returns:

  • self

Raises:

  • if both a value and a block are given

  • if the IVar has already been set or otherwise completed



79
80
81
82
83
84
85
86
87
88
89
# File 'lib/concurrent/future.rb', line 79

def set(value = NULL, &block)
  check_for_block_or_value!(block_given?, value)
  synchronize do
    if @state != :unscheduled
      raise MultipleAssignmentError
    else
      @task = block || Proc.new { value }
    end
  end
  execute
end

#wait_or_cancel(timeout) ⇒ Boolean

Wait the given number of seconds for the operation to complete. On timeout attempt to cancel the operation.

Parameters:

  • the maximum time in seconds to wait.

Returns:

  • true if the operation completed before the timeout else false



118
119
120
121
122
123
124
125
126
# File 'lib/concurrent/future.rb', line 118

def wait_or_cancel(timeout)
  wait(timeout)
  if complete?
    true
  else
    cancel
    false
  end
end