Class: Dynflow::Future

Inherits:
Object
  • Object
show all
Extended by:
Algebrick::TypeCheck
Includes:
Algebrick::TypeCheck
Defined in:
lib/dynflow/future.rb

Defined Under Namespace

Classes: CountDownLatch

Constant Summary collapse

Error =
Class.new Dynflow::Error
FutureAlreadySet =
Class.new Error
FutureFailed =
Class.new Error
TimeOut =
Class.new Error

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(&task) ⇒ Future

Returns a new instance of Future.



43
44
45
46
47
48
49
50
51
# File 'lib/dynflow/future.rb', line 43

def initialize(&task)
  @lock     = Mutex.new
  @value    = nil
  @resolved = false
  @failed   = false
  @waiting  = []
  @tasks    = []
  do_then &task if task
end

Class Method Details

.join(futures, result = Future.new) ⇒ Object



34
35
36
37
38
39
40
41
# File 'lib/dynflow/future.rb', line 34

def self.join(futures, result = Future.new)
  countdown = CountDownLatch.new(futures.size, result)
  futures.each do |future|
    Type! future, Future
    future.do_then { |_| countdown.countdown! }
  end
  result
end

Instance Method Details

#do_then(&task) ⇒ Object



85
86
87
88
89
90
91
92
# File 'lib/dynflow/future.rb', line 85

def do_then(&task)
  call_task = @lock.synchronize do
    @tasks << task unless _ready?
    @resolved
  end
  task.call value if call_task
  self
end

#evaluate_to(&block) ⇒ Object



74
75
76
77
78
# File 'lib/dynflow/future.rb', line 74

def evaluate_to(&block)
  resolve block.call
rescue => error
  self.fail error
end

#evaluate_to!(&block) ⇒ Object



80
81
82
83
# File 'lib/dynflow/future.rb', line 80

def evaluate_to!(&block)
  evaluate_to &block
  raise value if self.failed?
end

#expired(thread) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.



145
146
147
148
149
# File 'lib/dynflow/future.rb', line 145

def expired(thread)
  @lock.synchronize do
    thread.wakeup if @waiting.delete(thread)
  end
end

#fail(exception) ⇒ Object



66
67
68
69
70
71
72
# File 'lib/dynflow/future.rb', line 66

def fail(exception)
  Type! exception, Exception, String
  if exception.is_a? String
    exception = FutureFailed.new(exception).tap { |e| e.set_backtrace caller }
  end
  set exception, true
end

#failed?Boolean

Returns:

  • (Boolean)


136
137
138
# File 'lib/dynflow/future.rb', line 136

def failed?
  @lock.synchronize { @failed }
end

#ready?Boolean

Returns:

  • (Boolean)


128
129
130
# File 'lib/dynflow/future.rb', line 128

def ready?
  @lock.synchronize { _ready? }
end

#resolve(result) ⇒ Object



62
63
64
# File 'lib/dynflow/future.rb', line 62

def resolve(result)
  set result, false
end

#resolved?Boolean

Returns:

  • (Boolean)


132
133
134
# File 'lib/dynflow/future.rb', line 132

def resolved?
  @lock.synchronize { @resolved }
end

#set(value, failed) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/dynflow/future.rb', line 94

def set(value, failed)
  @lock.synchronize do
    raise FutureAlreadySet, "future already set to #{@value} cannot use #{value}" if _ready?
    if failed
      @failed = true
    else
      @resolved = true
    end
    @value = value
    while (thread = @waiting.pop)
      begin
        thread.wakeup
      rescue ThreadError
        retry
      end
    end
    !failed
  end
  @tasks.each { |t| t.call value }
  self
end

#tangle(future) ⇒ Object



140
141
142
# File 'lib/dynflow/future.rb', line 140

def tangle(future)
  do_then { |v| future.set v, failed? }
end

#value(timeout = nil) ⇒ Object



53
54
55
56
# File 'lib/dynflow/future.rb', line 53

def value(timeout = nil)
  wait timeout
  @lock.synchronize { @value }
end

#value!Object



58
59
60
# File 'lib/dynflow/future.rb', line 58

def value!
  value.tap { raise value if failed? }
end

#wait(timeout = nil) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
# File 'lib/dynflow/future.rb', line 116

def wait(timeout = nil)
  @lock.synchronize do
    unless _ready?
      @waiting << Thread.current
      clock.ping self, timeout, Thread.current, :expired if timeout
      @lock.sleep
      raise TimeOut unless _ready?
    end
  end
  self
end