Class: Concurrent::Edge::Event
- Inherits:
-
Synchronization::LockableObject
- Object
- Synchronization::LockableObject
- Concurrent::Edge::Event
- Includes:
- Concern::Deprecation, Concern::Logging
- Defined in:
- lib/concurrent/edge/future.rb
Overview
Represents an event which will happen in future (will be completed). It has to always happen.
Direct Known Subclasses
Instance Method Summary collapse
- #chain(executor = nil) {|success, value, reason| ... } ⇒ Object (also: #then)
- #chain_completable(completable_event) ⇒ Object (also: #tangle)
-
#completed?(state = internal_state) ⇒ Boolean
(also: #complete?)
Has the Event been completed?.
-
#default_executor ⇒ Executor
Current default executor.
-
#delay ⇒ Event
Inserts delay into the chain of Futures making rest of it lazy evaluated.
-
#initialize(promise, default_executor) ⇒ Event
constructor
A new instance of Event.
- #inspect ⇒ Object
-
#on_completion(executor = nil) {|success, value, reason| ... } ⇒ Object
Self.
-
#on_completion! {|success, value, reason| ... } ⇒ Object
Self.
-
#pending?(state = internal_state) ⇒ Boolean
(also: #incomplete?)
Is Event/Future pending?.
- #set(*args, &block) ⇒ Object
- #state ⇒ :pending, :completed
-
#then_select(*channels) ⇒ Future
Zips with selected value form the suplied channels.
- #to_s ⇒ Object
- #unscheduled? ⇒ Boolean
-
#wait(timeout = nil) ⇒ Event, ...
Wait until Event is #complete?.
-
#with_default_executor(executor) ⇒ Event
Changes default executor for rest of the chain.
-
#zip(other) ⇒ Event
(also: #&)
Zip with future producing new Future.
Constructor Details
#initialize(promise, default_executor) ⇒ Event
Returns a new instance of Event.
189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/concurrent/edge/future.rb', line 189 def initialize(promise, default_executor) super() @Promise = promise @DefaultExecutor = default_executor @Touched = AtomicBoolean.new(false) @Callbacks = LockFreeStack.new # TODO (pitr 12-Sep-2015): replace with AtomicFixnum, avoid aba problem # TODO (pitr 12-Sep-2015): look at java.util.concurrent solution @Waiters = LockFreeStack.new self.internal_state = PENDING end |
Instance Method Details
#chain(executor = nil) {|success, value, reason| ... } ⇒ Object Also known as: then
252 253 254 |
# File 'lib/concurrent/edge/future.rb', line 252 def chain(executor = nil, &callback) ChainPromise.new(self, @DefaultExecutor, executor || @DefaultExecutor, &callback).future end |
#chain_completable(completable_event) ⇒ Object Also known as: tangle
258 259 260 |
# File 'lib/concurrent/edge/future.rb', line 258 def chain_completable(completable_event) on_completion! { completable_event.complete_with COMPLETED } end |
#completed?(state = internal_state) ⇒ Boolean Also known as: complete?
Has the Event been completed?
220 221 222 |
# File 'lib/concurrent/edge/future.rb', line 220 def completed?(state = internal_state) state.completed? end |
#default_executor ⇒ Executor
Returns current default executor.
247 248 249 |
# File 'lib/concurrent/edge/future.rb', line 247 def default_executor @DefaultExecutor end |
#delay ⇒ Event
Inserts delay into the chain of Futures making rest of it lazy evaluated.
278 279 280 |
# File 'lib/concurrent/edge/future.rb', line 278 def delay ZipEventEventPromise.new(self, Delay.new(@DefaultExecutor).event, @DefaultExecutor).event end |
#inspect ⇒ Object
320 321 322 |
# File 'lib/concurrent/edge/future.rb', line 320 def inspect "#{to_s[0..-2]} blocks:[#{blocks.map(&:to_s).join(', ')}]>" end |
#on_completion(executor = nil) {|success, value, reason| ... } ⇒ Object
Returns self.
300 301 302 |
# File 'lib/concurrent/edge/future.rb', line 300 def on_completion(executor = nil, &callback) add_callback :pr_async_callback_on_completion, executor || @DefaultExecutor, callback end |
#on_completion! {|success, value, reason| ... } ⇒ Object
Returns self.
306 307 308 |
# File 'lib/concurrent/edge/future.rb', line 306 def on_completion!(&callback) add_callback :pr_callback_on_completion, callback end |
#pending?(state = internal_state) ⇒ Boolean Also known as: incomplete?
Is Event/Future pending?
208 209 210 |
# File 'lib/concurrent/edge/future.rb', line 208 def pending?(state = internal_state) !state.completed? end |
#set(*args, &block) ⇒ Object
324 325 326 327 |
# File 'lib/concurrent/edge/future.rb', line 324 def set(*args, &block) raise 'Use CompletableEvent#complete or CompletableFuture#complete instead, ' + 'constructed by Concurrent.event or Concurrent.future respectively.' end |
#state ⇒ :pending, :completed
202 203 204 |
# File 'lib/concurrent/edge/future.rb', line 202 def state internal_state.to_sym end |
#then_select(*channels) ⇒ Future
Zips with selected value form the suplied channels
294 295 296 |
# File 'lib/concurrent/edge/future.rb', line 294 def then_select(*channels) ZipFutureEventPromise(Concurrent.select(*channels), self, @DefaultExecutor).future end |
#to_s ⇒ Object
316 317 318 |
# File 'lib/concurrent/edge/future.rb', line 316 def to_s "<##{self.class}:0x#{'%x' % (object_id << 1)} #{state.to_sym}>" end |
#unscheduled? ⇒ Boolean
212 213 214 |
# File 'lib/concurrent/edge/future.rb', line 212 def unscheduled? raise 'unsupported' end |
#wait(timeout = nil) ⇒ Event, ...
a thread should wait only once! For repeated checking use faster ‘completed?` check. If thread waits periodically it will dangerously grow the waiters stack.
Wait until Event is #complete?
232 233 234 235 236 |
# File 'lib/concurrent/edge/future.rb', line 232 def wait(timeout = nil) touch result = wait_until_complete(timeout) timeout ? result : self end |
#with_default_executor(executor) ⇒ Event
Changes default executor for rest of the chain
312 313 314 |
# File 'lib/concurrent/edge/future.rb', line 312 def with_default_executor(executor) EventWrapperPromise.new(self, executor).future end |
#zip(other) ⇒ Event Also known as: &
Zip with future producing new Future
266 267 268 269 270 271 272 |
# File 'lib/concurrent/edge/future.rb', line 266 def zip(other) if other.is?(Future) ZipFutureEventPromise.new(other, self, @DefaultExecutor).future else ZipEventEventPromise.new(self, other, @DefaultExecutor).future end end |