Class: Async::Promise

Inherits:
Object
  • Object
show all
Defined in:
lib/async/promise.rb

Overview

A promise represents a value that will be available in the future. Unlike Condition, once resolved (or rejected), all future waits return immediately with the stored value or raise the stored exception.

This is thread-safe and integrates with the fiber scheduler.

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializePromise

Create a new promise.



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/async/promise.rb', line 21

def initialize
	# nil = pending, :completed = success, :failed = failure, :cancelled = cancelled:
	@resolved = nil
	
	# Stores either the result value or the exception:
	@value = nil
	
	# Track how many fibers are currently waiting:
	@waiting = 0
	
	@mutex = Mutex.new
	@condition = ConditionVariable.new
end

Class Method Details

.fulfill(promise, &block) ⇒ Object

If a promise is given, fulfill it with the result of the block. If no promise is given, simply yield to the block. This is useful for methods that may optionally take a promise to fulfill.



255
256
257
258
259
260
261
# File 'lib/async/promise.rb', line 255

def self.fulfill(promise, &block)
	if promise
		return promise.fulfill(&block)
	else
		return yield
	end
end

Instance Method Details

#cancel(exception = Cancel.new("Promise cancelled!")) ⇒ Object

Cancel the promise, indicating cancellation. All current and future waiters will receive nil. Can only be called on pending promises - no-op if already resolved.



211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/async/promise.rb', line 211

def cancel(exception = Cancel.new("Promise cancelled!"))
	@mutex.synchronize do
		# No-op if already in any final state
		return if @resolved
		
		@resolved = :cancelled
		@value = exception
		
		# Wake up all waiting fibers:
		@condition.broadcast
	end
	
	return nil
end

#cancelled?Boolean

Returns:

  • (Boolean)


47
48
49
# File 'lib/async/promise.rb', line 47

def cancelled?
	@mutex.synchronize{@resolved == :cancelled}
end

#completed?Boolean

Returns:

  • (Boolean)


57
58
59
# File 'lib/async/promise.rb', line 57

def completed?
	@mutex.synchronize{@resolved == :completed}
end

#failed?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/async/promise.rb', line 52

def failed?
	@mutex.synchronize{@resolved == :failed}
end

#fulfill(&block) ⇒ Object

Resolve the promise with the result of the block. If the block raises an exception, the promise will be rejected. If the promise was already resolved, the block will not be called.



231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/async/promise.rb', line 231

def fulfill(&block)
	raise "Promise already resolved!" if @resolved
	
	begin
		return self.resolve(yield)
	rescue Cancel => exception
		return self.cancel(exception)
	rescue => error
		return self.reject(error)
	rescue Exception => exception
		self.reject(exception)
		raise
	ensure
		# Handle non-local exits (throw, etc.) that bypass normal flow:
		self.resolve(nil) unless @resolved
	end
end

#reject(exception) ⇒ Object

Reject the promise with an exception. All current and future waiters will receive this exception. Can only be called once - subsequent calls are ignored.



194
195
196
197
198
199
200
201
202
203
204
205
206
# File 'lib/async/promise.rb', line 194

def reject(exception)
	@mutex.synchronize do
		return if @resolved
		
		@resolved = :failed
		@value = exception
		
		# Wake up all waiting fibers:
		@condition.broadcast
	end
	
	return nil
end

#resolve(value) ⇒ Object

Resolve the promise with a value. All current and future waiters will receive this value. Can only be called once - subsequent calls are ignored.



175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/async/promise.rb', line 175

def resolve(value)
	@mutex.synchronize do
		return if @resolved
		
		@resolved = :completed
		@value = value
		
		# Wake up all waiting fibers:
		@condition.broadcast
	end
	
	return value
end

#resolvedObject



42
43
44
# File 'lib/async/promise.rb', line 42

def resolved
	@mutex.synchronize{@resolved}
end

#resolved?Boolean

Returns:

  • (Boolean)


36
37
38
# File 'lib/async/promise.rb', line 36

def resolved?
	@mutex.synchronize{!!@resolved}
end

#suppress_warnings!Object

Artificially mark that someone is waiting (useful for suppressing warnings).



68
69
70
# File 'lib/async/promise.rb', line 68

def suppress_warnings!
	@mutex.synchronize{@waiting += 1}
end

#valueObject

Non-blocking access to the current value. Returns nil if not yet resolved. Does not raise exceptions even if the promise was rejected or cancelled. For resolved promises, returns the raw stored value (result, exception, or cancel exception).



77
78
79
# File 'lib/async/promise.rb', line 77

def value
	@mutex.synchronize{@resolved ? @value : nil}
end

#waitObject

Wait for the promise to be resolved and return the value.

If already resolved, returns immediately. If rejected, raises the stored exception.



157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/async/promise.rb', line 157

def wait(...)
	resolved = wait?(...)
	
	if resolved.nil?
		raise TimeoutError, "Timeout while waiting for promise!"
	elsif resolved == :completed
		return @value
	elsif @value
		# If we aren't completed, we should have an exception or cancel reason stored:
		raise @value
	end
end

#wait?(timeout: nil) ⇒ Boolean

Wait for the promise to be resolved (without raising exceptions).

If already resolved, returns immediately. Otherwise, waits until resolution or timeout.

Returns:

  • (Boolean)


122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/async/promise.rb', line 122

def wait?(timeout: nil)
	unless @resolved
		@mutex.synchronize do
			# Increment waiting count:
			@waiting += 1
			
			begin
				# Wait for resolution if not already resolved:
				unless @resolved
					if timeout.nil?
						wait_indefinitely
					else
						unless wait_with_timeout(timeout)
							# We don't want to race on @resolved after exiting the mutex:
							return nil
						end
					end
				end
			ensure
				# Decrement waiting count when done:
				@waiting -= 1
			end
		end
	end
	
	return @resolved
end

#waiting?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/async/promise.rb', line 62

def waiting?
	@mutex.synchronize{@waiting > 0}
end