Class: Dynflow::Future
- Inherits:
-
Object
- Object
- Dynflow::Future
- Extended by:
- Algebrick::TypeCheck
- Includes:
- Algebrick::TypeCheck
- Defined in:
- lib/dynflow/future.rb
Defined Under Namespace
Classes: CountDownLatch
Constant Summary collapse
- Error =
Class.new Dynflow::Error
- FutureAlreadySet =
Class.new Error
- FutureFailed =
Class.new Error
- TimeOut =
Class.new Error
Class Method Summary collapse
Instance Method Summary collapse
- #do_then(&task) ⇒ Object
- #evaluate_to(&block) ⇒ Object
- #evaluate_to!(&block) ⇒ Object
- #expired(thread) ⇒ Object private
- #fail(exception) ⇒ Object
- #failed? ⇒ Boolean
-
#initialize(&task) ⇒ Future
constructor
A new instance of Future.
- #ready? ⇒ Boolean
- #resolve(result) ⇒ Object
- #resolved? ⇒ Boolean
- #set(value, failed) ⇒ Object
- #tangle(future) ⇒ Object
- #value(timeout = nil) ⇒ Object
- #value! ⇒ Object
- #wait(timeout = nil) ⇒ Object
Constructor Details
#initialize(&task) ⇒ Future
Returns a new instance of Future.
43 44 45 46 47 48 49 50 51 |
# File 'lib/dynflow/future.rb', line 43 def initialize(&task) @lock = Mutex.new @value = nil @resolved = false @failed = false @waiting = [] @tasks = [] do_then &task if task end |
Class Method Details
.join(futures, result = Future.new) ⇒ Object
34 35 36 37 38 39 40 41 |
# File 'lib/dynflow/future.rb', line 34 def self.join(futures, result = Future.new) countdown = CountDownLatch.new(futures.size, result) futures.each do |future| Type! future, Future future.do_then { |_| countdown.countdown! } end result end |
Instance Method Details
#do_then(&task) ⇒ Object
85 86 87 88 89 90 91 92 |
# File 'lib/dynflow/future.rb', line 85 def do_then(&task) call_task = @lock.synchronize do @tasks << task unless _ready? @resolved end task.call value if call_task self end |
#evaluate_to(&block) ⇒ Object
74 75 76 77 78 |
# File 'lib/dynflow/future.rb', line 74 def evaluate_to(&block) resolve block.call rescue => error self.fail error end |
#evaluate_to!(&block) ⇒ Object
80 81 82 83 |
# File 'lib/dynflow/future.rb', line 80 def evaluate_to!(&block) evaluate_to &block raise value if self.failed? end |
#expired(thread) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
145 146 147 148 149 |
# File 'lib/dynflow/future.rb', line 145 def expired(thread) @lock.synchronize do thread.wakeup if @waiting.delete(thread) end end |
#fail(exception) ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/dynflow/future.rb', line 66 def fail(exception) Type! exception, Exception, String if exception.is_a? String exception = FutureFailed.new(exception).tap { |e| e.set_backtrace caller } end set exception, true end |
#failed? ⇒ Boolean
136 137 138 |
# File 'lib/dynflow/future.rb', line 136 def failed? @lock.synchronize { @failed } end |
#ready? ⇒ Boolean
128 129 130 |
# File 'lib/dynflow/future.rb', line 128 def ready? @lock.synchronize { _ready? } end |
#resolve(result) ⇒ Object
62 63 64 |
# File 'lib/dynflow/future.rb', line 62 def resolve(result) set result, false end |
#resolved? ⇒ Boolean
132 133 134 |
# File 'lib/dynflow/future.rb', line 132 def resolved? @lock.synchronize { @resolved } end |
#set(value, failed) ⇒ Object
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/dynflow/future.rb', line 94 def set(value, failed) @lock.synchronize do raise FutureAlreadySet, "future already set to #{@value} cannot use #{value}" if _ready? if failed @failed = true else @resolved = true end @value = value while (thread = @waiting.pop) begin thread.wakeup rescue ThreadError retry end end !failed end @tasks.each { |t| t.call value } self end |
#tangle(future) ⇒ Object
140 141 142 |
# File 'lib/dynflow/future.rb', line 140 def tangle(future) do_then { |v| future.set v, failed? } end |
#value(timeout = nil) ⇒ Object
53 54 55 56 |
# File 'lib/dynflow/future.rb', line 53 def value(timeout = nil) wait timeout @lock.synchronize { @value } end |
#value! ⇒ Object
58 59 60 |
# File 'lib/dynflow/future.rb', line 58 def value! value.tap { raise value if failed? } end |
#wait(timeout = nil) ⇒ Object
116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/dynflow/future.rb', line 116 def wait(timeout = nil) @lock.synchronize do unless _ready? @waiting << Thread.current clock.ping self, timeout, Thread.current, :expired if timeout @lock.sleep raise TimeOut unless _ready? end end self end |