Module: Concurrent::Edge::FutureShortcuts

Included in:
Concurrent, Concurrent, Concurrent::Edge, Concurrent::Edge
Defined in:
lib/concurrent/edge/future.rb

Overview

Provides edge features, which will be added to or replace features in main gem.

Contains new unified implementation of Futures and Promises which combines Features of previous ‘Future`, `Promise`, `IVar`, `Event`, `Probe`, `dataflow`, `Delay`, `TimerTask` into single framework. It uses extensively new synchronization layer to make all the paths lock-free with exception of blocking threads on `#wait`. It offers better performance and does not block threads (exception being #wait and similar methods where it’s intended).

## Examples

Instance Method Summary collapse

Instance Method Details

#any(*futures) ⇒ Future

Constructs new Concurrent::Edge::Future which is completed after first of the futures is complete.

Parameters:

Returns:



101
102
103
# File 'lib/concurrent/edge/future.rb', line 101

def any(*futures)
  AnyPromise.new(futures, :io).future
end

#completed_event(default_executor = :io) ⇒ Event

Returns which is already completed.

Returns:

  • (Event)

    which is already completed



59
60
61
# File 'lib/concurrent/edge/future.rb', line 59

def completed_event(default_executor = :io)
  ImmediateEventPromise.new(default_executor).event
end

#completed_future(success, value, reason, default_executor = :io) ⇒ Future

Returns which is already completed.

Returns:

  • (Future)

    which is already completed



44
45
46
# File 'lib/concurrent/edge/future.rb', line 44

def completed_future(success, value, reason, default_executor = :io)
  ImmediateFuturePromise.new(default_executor, success, value, reason).future
end

#delay(default_executor = :io, &task) ⇒ Future

Constructs new Future which will evaluate to the block after requested by calling ‘#wait`, `#value`, `#value!`, etc. on it or on any of the chained futures.

Returns:



68
69
70
# File 'lib/concurrent/edge/future.rb', line 68

def delay(default_executor = :io, &task)
  Delay.new(default_executor).future.then(&task)
end

#event(default_executor = :io) ⇒ CompletableEvent

User is responsible for completing the event once by CompletableEvent#complete

Returns:



25
26
27
# File 'lib/concurrent/edge/future.rb', line 25

def event(default_executor = :io)
  CompletableEventPromise.new(default_executor).future
end

#failed_future(reason, default_executor = :io) ⇒ Future

Returns which is already completed in failed state with reason.

Returns:

  • (Future)

    which is already completed in failed state with reason



54
55
56
# File 'lib/concurrent/edge/future.rb', line 54

def failed_future(reason, default_executor = :io)
  completed_future false, nil, reason, default_executor
end

#future(default_executor = :io, &task) ⇒ Future #future(default_executor = :io) ⇒ CompletableFuture Also known as: async

Overloads:



35
36
37
38
39
40
41
# File 'lib/concurrent/edge/future.rb', line 35

def future(default_executor = :io, &task)
  if task
    ImmediateEventPromise.new(default_executor).future.then(&task)
  else
    CompletableFuturePromise.new(default_executor).future
  end
end

#post(*args, &job) ⇒ true, false

post job on :io executor

Returns:

  • (true, false)


126
127
128
# File 'lib/concurrent/edge/future.rb', line 126

def post(*args, &job)
  post_on(:io, *args, &job)
end

#post!(*args, &job) ⇒ true, false

post job on :fast executor

Returns:

  • (true, false)


120
121
122
# File 'lib/concurrent/edge/future.rb', line 120

def post!(*args, &job)
  post_on(:fast, *args, &job)
end

#post_on(executor, *args, &job) ⇒ true, false

post job on executor

Returns:

  • (true, false)


132
133
134
# File 'lib/concurrent/edge/future.rb', line 132

def post_on(executor, *args, &job)
  Concurrent.executor(executor).post(*args, &job)
end

#schedule(intended_time, default_executor = :io, &task) ⇒ Future

Schedules the block to be executed on executor in given intended_time.

Parameters:

  • intended_time (Numeric, Time)

    Numeric => run in ‘intended_time` seconds. Time => eun on time.

Returns:



75
76
77
# File 'lib/concurrent/edge/future.rb', line 75

def schedule(intended_time, default_executor = :io, &task)
  ScheduledPromise.new(default_executor, intended_time).future.then(&task)
end

#select(*channels) ⇒ Future

only proof of concept

Returns:



107
108
109
110
111
112
113
114
115
116
# File 'lib/concurrent/edge/future.rb', line 107

def select(*channels)
  future do
    # noinspection RubyArgCount
    Channel.select do |s|
      channels.each do |ch|
        s.take(ch) { |value| [value, ch] }
      end
    end
  end
end

#succeeded_future(value, default_executor = :io) ⇒ Future

Returns which is already completed in success state with value.

Returns:

  • (Future)

    which is already completed in success state with value



49
50
51
# File 'lib/concurrent/edge/future.rb', line 49

def succeeded_future(value, default_executor = :io)
  completed_future true, value, nil, default_executor
end

#zip_events(*futures_and_or_events) ⇒ Event

Constructs new Event which is completed after all futures_and_or_events are complete (Future is completed when Success or Failed).

Parameters:

  • futures_and_or_events (Event)

Returns:



94
95
96
# File 'lib/concurrent/edge/future.rb', line 94

def zip_events(*futures_and_or_events)
  ZipEventsPromise.new(futures_and_or_events, :io).future
end

#zip_futures(*futures_and_or_events) ⇒ Future Also known as: zip

Constructs new Concurrent::Edge::Future which is completed after all futures_and_or_events are complete. Its value is array of dependent future values. If there is an error it fails with the first one. Event does not have a value so it’s represented by nil in the array of values.

Parameters:

  • futures_and_or_events (Event)

Returns:



84
85
86
# File 'lib/concurrent/edge/future.rb', line 84

def zip_futures(*futures_and_or_events)
  ZipFuturesPromise.new(futures_and_or_events, :io).future
end