Class: Async::Promise

Inherits:
Object
  • Object
show all
Defined in:
lib/async/promise.rb

Overview

A promise represents a value that will be available in the future. Unlike Condition, once resolved (or rejected), all future waits return immediately with the stored value or raise the stored exception.

This is thread-safe and integrates with the fiber scheduler.

Defined Under Namespace

Classes: Cancel

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePromise

Create a new promise.



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/async/promise.rb', line 17

def initialize
  # nil = pending, :completed = success, :failed = failure, :cancelled = cancelled:
  @resolved = nil
  
  # Stores either the result value or the exception:
  @value = nil
  
  # Track how many fibers are currently waiting:
  @waiting = 0
  
  @mutex = Mutex.new
  @condition = ConditionVariable.new
end

Class Method Details

.fulfill(promise, &block) ⇒ Object

If a promise is given, fulfill it with the result of the block. If no promise is given, simply yield to the block. This is useful for methods that may optionally take a promise to fulfill.



196
197
198
199
200
201
202
# File 'lib/async/promise.rb', line 196

def self.fulfill(promise, &block)
  if promise
    return promise.fulfill(&block)
  else
    return yield
  end
end

Instance Method Details

#cancel(exception = Cancel.new("Promise was cancelled!")) ⇒ Object

Cancel the promise, indicating cancellation. All current and future waiters will receive nil. Can only be called on pending promises - no-op if already resolved.



152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/async/promise.rb', line 152

def cancel(exception = Cancel.new("Promise was cancelled!"))
  @mutex.synchronize do
    # No-op if already in any final state
    return if @resolved
    
    @value = exception
    @resolved = :cancelled
    
    # Wake up all waiting fibers:
    @condition.broadcast
  end
  
  return nil
end

#cancelled?Boolean

Returns:

  • (Boolean)


43
44
45
# File 'lib/async/promise.rb', line 43

def cancelled?
  @mutex.synchronize{@resolved == :cancelled}
end

#completed?Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/async/promise.rb', line 53

def completed?
  @mutex.synchronize{@resolved == :completed}
end

#failed?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/async/promise.rb', line 48

def failed?
  @mutex.synchronize{@resolved == :failed}
end

#fulfill(&block) ⇒ Object

Resolve the promise with the result of the block. If the block raises an exception, the promise will be rejected. If the promise was already resolved, the block will not be called.



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/async/promise.rb', line 172

def fulfill(&block)
  raise "Promise already resolved!" if @resolved
  
  begin
    return self.resolve(yield)
  rescue Cancel => exception
    return self.cancel(exception)
  rescue => error
    return self.reject(error)
  rescue Exception => exception
    self.reject(exception)
    raise
  ensure
    # Handle non-local exits (throw, etc.) that bypass normal flow:
    self.resolve(nil) unless @resolved
  end
end

#reject(exception) ⇒ Object

Reject the promise with an exception. All current and future waiters will receive this exception. Can only be called once - subsequent calls are ignored.



131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/async/promise.rb', line 131

def reject(exception)
  @mutex.synchronize do
    return if @resolved
    
    @value = exception
    @resolved = :failed
    
    # Wake up all waiting fibers:
    @condition.broadcast
  end
  
  return nil
end

#resolve(value) ⇒ Object

Resolve the promise with a value. All current and future waiters will receive this value. Can only be called once - subsequent calls are ignored.



112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/async/promise.rb', line 112

def resolve(value)
  @mutex.synchronize do
    return if @resolved
    
    @value = value
    @resolved = :completed
    
    # Wake up all waiting fibers:
    @condition.broadcast
  end
  
  return value
end

#resolvedObject



38
39
40
# File 'lib/async/promise.rb', line 38

def resolved
  @mutex.synchronize{@resolved}
end

#resolved?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/async/promise.rb', line 32

def resolved?
  @mutex.synchronize{!!@resolved}
end

#suppress_warnings!Object

Artificially mark that someone is waiting (useful for suppressing warnings).



64
65
66
# File 'lib/async/promise.rb', line 64

def suppress_warnings!
  @mutex.synchronize{@waiting += 1}
end

#valueObject

Non-blocking access to the current value. Returns nil if not yet resolved. Does not raise exceptions even if the promise was rejected or cancelled. For resolved promises, returns the raw stored value (result, exception, or cancel exception).



73
74
75
# File 'lib/async/promise.rb', line 73

def value
  @mutex.synchronize{@resolved ? @value : nil}
end

#waitObject

Wait for the promise to be resolved and return the value. If already resolved, returns immediately. If rejected, raises the stored exception.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/async/promise.rb', line 82

def wait
  @mutex.synchronize do
    # Increment waiting count:
    @waiting += 1
    
    begin
      # Wait for resolution if not already resolved:
      until @resolved
        @condition.wait(@mutex)
      end
      
      # Return value or raise exception based on resolution type:
      if @resolved == :completed
        return @value
      else
        # Both :failed and :cancelled store exceptions in @value
        raise @value
      end
    ensure
      # Decrement waiting count when done:
      @waiting -= 1
    end
  end
end

#waiting?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/async/promise.rb', line 58

def waiting?
  @mutex.synchronize{@waiting > 0}
end