Class: Roby::Promise

Inherits:
Object show all
Defined in:
lib/roby/promise.rb

Overview

An extension to Concurrent::Promise that is aware of the mixed thread/event loop nature of Roby

Use ExecutionEngine#promise to create one

#on_success and #rescue gain an in_engine argument, which decides whether the given block should be executed by the underlying execution engine’s or not. It is true by default. Note that #then is not overriden

This promise implementation has no graph capabilities. The execution must be a pipeline, and a whole pipeline is represented by a single Promise. State predicates such as #fulfilled? or #rejected? are valid for the whole pipeline. There is no way to handle errors for only parts of the pipeline.

Defined Under Namespace

Classes: AlreadyHasErrorHandler, Failure, NotComplete, Null, PipelineElement

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(execution_engine, executor: execution_engine.thread_pool, description: "promise", &block) ⇒ Promise

Returns a new instance of Promise.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/roby/promise.rb', line 37

def initialize(
    execution_engine,
    executor: execution_engine.thread_pool, description: "promise", &block
)
    @execution_engine = execution_engine
    @description = description

    @pipeline = []
    @error_pipeline = []
    @promise = Concurrent::Promise.new(executor: executor, &method(:run_pipeline))
    @current_element = Concurrent::AtomicReference.new
    @error_handling_failure = nil
    self.then(&block) if block
end

Instance Attribute Details

#descriptionObject (readonly)

A description text for debugging purposes



27
28
29
# File 'lib/roby/promise.rb', line 27

def description
  @description
end

#error_handling_failureException? (readonly)

The exception raised by the error handlers when during processing

Returns:



216
217
218
# File 'lib/roby/promise.rb', line 216

def error_handling_failure
  @error_handling_failure
end

#error_pipelineArray<PipelineElement> (readonly)

The pipeline that will be executed if an error happens in #pipeline

Returns:



35
36
37
# File 'lib/roby/promise.rb', line 35

def error_pipeline
  @error_pipeline
end

#execution_engineObject (readonly)

The execution engine we execute on



22
23
24
# File 'lib/roby/promise.rb', line 22

def execution_engine
  @execution_engine
end

#pipelineArray<PipelineElement> (readonly)

The pipeline itself

Returns:



31
32
33
# File 'lib/roby/promise.rb', line 31

def pipeline
  @pipeline
end

#promiseObject (readonly)

The Promise object from concurrent-ruby that handles the nominal part of the execution



25
26
27
# File 'lib/roby/promise.rb', line 25

def promise
  @promise
end

Class Method Details

.null(value = nil) ⇒ Object



327
328
329
# File 'lib/roby/promise.rb', line 327

def self.null(value = nil)
    Null.new(value)
end

Instance Method Details

#add_observer {|time, result, reason| ... } ⇒ Object

Register a block that will be called on this promise’s termination

Yield Parameters:

  • time (Time)

    the termination time

  • result (Object, nil)

    the promise result if it finished execution successfully, or nil if an exception was raised

  • reason (Object, nil)

    the exception that terminated this promise if it failed, or nil if it finished successfully



323
324
325
# File 'lib/roby/promise.rb', line 323

def add_observer(&block)
    promise.add_observer(&block)
end

#before(description: "#{self.description}.before", in_engine: true, &block) ⇒ Object

Queue a block at the beginning of the pipeline



219
220
221
222
# File 'lib/roby/promise.rb', line 219

def before(description: "#{self.description}.before", in_engine: true, &block)
    pipeline.unshift PipelineElement.new(description, in_engine, block)
    self
end

#complete?Boolean

Returns:



275
276
277
# File 'lib/roby/promise.rb', line 275

def complete?
    promise.complete?
end

#current_elementString?

The description element being currently executed

Returns:



65
66
67
# File 'lib/roby/promise.rb', line 65

def current_element
    @current_element.get
end

#empty?Boolean

Whether this promise does have elements

Returns:



58
59
60
# File 'lib/roby/promise.rb', line 58

def empty?
    @pipeline.empty?
end

#executeObject



261
262
263
264
265
# File 'lib/roby/promise.rb', line 261

def execute
    execution_engine.waiting_work << self
    promise.execute
    self
end

#fail(exception = StandardError) ⇒ Object



257
258
259
# File 'lib/roby/promise.rb', line 257

def fail(exception = StandardError)
    promise.fail(exception)
end

#fulfilled?Boolean

Returns:



279
280
281
# File 'lib/roby/promise.rb', line 279

def fulfilled?
    promise.fulfilled?
end

#handled_error?Boolean

Whether the promise’s failure was successfully handled by error handlers

This makes only sense when #rejected? returns true

This will return true if (1) there are error handlers and (2) executing the handlers did not raise errors

Returns:



209
210
211
# File 'lib/roby/promise.rb', line 209

def handled_error?
    has_error_handler? && !@error_handling_failure
end

#has_error_handler?Boolean

Whether self already has an error handler

Unlike Concurrent::Promise, Roby::Promise objects can only have one error handler

Returns:



199
200
201
# File 'lib/roby/promise.rb', line 199

def has_error_handler?
    !error_pipeline.empty?
end

#null?Boolean

Whether this is a null promise

Returns:



53
54
55
# File 'lib/roby/promise.rb', line 53

def null?
    false
end

#on_error(description: "#{self.description}.on_error", in_engine: true) {|reason| ... } ⇒ Object

Schedule execution of a block if self or one of its parents failed

Parameters:

  • (defaults to: "#{self.description}.on_error")

    a textual description useful for debugging

  • (defaults to: true)

    whether the block should be executed within the underlying ExecutionEngine, a.k.a. in the main thread, or scheduled in a separate thread.

Yield Parameters:

  • reason (Object)

    the exception that caused the failure, usually an exception that was raised by one of the promise blocks.



246
247
248
249
# File 'lib/roby/promise.rb', line 246

def on_error(description: "#{self.description}.on_error", in_engine: true, &block)
    error_pipeline << PipelineElement.new(description, in_engine, block)
    self
end

#on_success(description: "#{self.description}.on_success[#{pipeline.size}]", in_engine: true, &block) ⇒ Object

Schedule execution of a block on the success of self

Parameters:

  • (defaults to: "#{self.description}.on_success[#{pipeline.size}]")

    a textual description useful for debugging

  • (defaults to: true)

    whether the block should be executed within the underlying ExecutionEngine, a.k.a. in the main thread, or scheduled in a separate thread.



230
231
232
233
234
235
236
# File 'lib/roby/promise.rb', line 230

def on_success(
    description: "#{self.description}.on_success[#{pipeline.size}]",
    in_engine: true, &block
)
    pipeline << PipelineElement.new(description, in_engine, block)
    self
end

#pending?Boolean

Returns:



271
272
273
# File 'lib/roby/promise.rb', line 271

def pending?
    promise.pending?
end

#pretty_print(pp) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/roby/promise.rb', line 165

def pretty_print(pp)
    description = self.description
    pp.text "Roby::Promise(#{description}"
    if (current_element = self.current_element)
        pp.text ", currently: #{current_element})"
    else
        pp.text ")"
    end

    pipeline.each do |element|
        pp.nest(2) do
            pp.text "."
            pp.breakable
            if element.run_in_engine
                pp.text "on_success(#{element.description})"
            else
                pp.text "then(#{element.description})"
            end
        end
    end
    error_pipeline.each do |element|
        pp.nest(2) do
            pp.text "."
            pp.breakable
            pp.text "on_error(#{element.description}, "\
                    "in_engine: #{element.run_in_engine})"
        end
    end
end

#reasonObject

Returns the exception that caused the promise to be rejected



306
307
308
309
# File 'lib/roby/promise.rb', line 306

def reason
    failure = promise.reason
    failure&.actual_exception
end

#rejected?Boolean

Returns:



283
284
285
# File 'lib/roby/promise.rb', line 283

def rejected?
    promise.rejected?
end

#run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Helper method for #run_pipeline_elements, to run a sequence of elements in a pipeline that have the same run_in_engine?

API:

  • private



147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/roby/promise.rb', line 147

def run_one_pipeline_segment(pipeline, state, in_engine, propagate_state: true)
    while (element = pipeline.first) && !(in_engine ^ element.run_in_engine)
        pipeline.shift
        @current_element.set(element.description)
        new_state = execution_engine.log_timepoint_group(
            "#{element.description} in_engine=#{element.run_in_engine}"
        ) do
            element.callback.call(state)
        end
        state = new_state if propagate_state
    end
    state
end

#run_pipeline(*state) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Internal implementation of the pipeline. This holds a thread until it is finished - there’s no point in giving the thread back between the steps in the pipeline, given how the promises are used in Roby (to avoid freezing due to blocking calls)

API:

  • private



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/roby/promise.rb', line 78

def run_pipeline(*state)
    Thread.current.name = "run_promises"

    execution_engine.log_timepoint_group description.to_s do
        begin
            run_pipeline_elements(pipeline, state)
        rescue ::Exception => error # rubocop:disable Naming/RescuedExceptionsVariableName
            begin
                run_pipeline_elements(error_pipeline, error,
                                      propagate_state: false)
            rescue ::Exception => error_handling_failure # rubocop:disable Naming/RescuedExceptionsVariableName
                @error_handling_failure = error_handling_failure
            end
            raise Failure.new(error)
        end
    end
ensure
    @current_element.set(nil)
end

#run_pipeline_elements(pipeline, state, propagate_state: true) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Run one of #pipeline or #error_pipeline

API:

  • private



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/roby/promise.rb', line 101

def run_pipeline_elements(pipeline, state, propagate_state: true)
    pipeline = pipeline.dup
    until pipeline.empty?
        state = run_one_pipeline_segment(
            pipeline, state, false, propagate_state: propagate_state
        )
        break if pipeline.empty?

        state = execution_engine.log_timepoint_group(
            "#{description}:execute_in_engine"
        ) do
            execution_engine.execute(type: :propagation) do
                run_one_pipeline_segment(
                    pipeline, state, true, propagate_state: propagate_state
                )
            end
        end
    end
    state
end

#stateObject

The promise’s execution state



312
313
314
# File 'lib/roby/promise.rb', line 312

def state
    promise.state
end

#then(description: "#{self.description}.then[#{pipeline.size}]", &block) ⇒ Object

Alias for #on_success, but defaulting to execution as a separate thread



253
254
255
# File 'lib/roby/promise.rb', line 253

def then(description: "#{self.description}.then[#{pipeline.size}]", &block)
    on_success(description: description, in_engine: false, &block)
end

#to_sObject



161
162
163
# File 'lib/roby/promise.rb', line 161

def to_s
    "#<Roby::Promise #{description}>"
end

#unscheduled?Boolean

Returns:



267
268
269
# File 'lib/roby/promise.rb', line 267

def unscheduled?
    promise.unscheduled?
end

#value(timeout = nil) ⇒ Object

Raises:



291
292
293
294
295
# File 'lib/roby/promise.rb', line 291

def value(timeout = nil)
    return promise.value(timeout) if promise.complete?

    raise NotComplete, "cannot call #value on a non-complete promise"
end

#value!(timeout = nil) ⇒ Object



297
298
299
300
301
302
303
# File 'lib/roby/promise.rb', line 297

def value!(timeout = nil)
    return promise.value!(timeout) if promise.complete?

    raise NotComplete, "cannot call #value on a non-complete promise"
rescue Failure => e
    raise e.actual_exception
end

#waitObject



287
288
289
# File 'lib/roby/promise.rb', line 287

def wait
    promise.wait
end