Class: ExcADG::Vertex

Inherits:
Ractor
  • Object
show all
Includes:
RTimeout
Defined in:
lib/excadg/vertex.rb

Overview

Individual vertex of the execution graph to run in a separated Ractor

Class Method Summary collapse

Instance Method Summary collapse

Methods included from RTimeout

await

Class Method Details

.new(payload:, name: nil, deps: [], timeout: nil) ⇒ Object

make a vertex, it runs automagically

Parameters:

  • payload

    Payload object to run in this Vertex

  • name (defaults to: nil)

    optional vertex name to identify vertex

  • deps (defaults to: [])

    list of other Vertices or names to wait for

  • timeout (ExcADG::VTimeouts) (defaults to: nil)

    or total time in seconds for the payload to run

Raises:

  • Payload::IncorrectPayloadArity in case payload returns function with arity > 1

  • Payload::NoPayloadSet in case payload provided has incorrect type



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/excadg/vertex.rb', line 71

def new payload:, name: nil, deps: [], timeout: nil
  raise Payload::NoPayloadSet, "expected payload, got #{payload.class}" unless payload.is_a? Payload

  raise Payload::IncorrectPayloadArity, "arity is #{payload.get.arity}, supported only 0 and 1" unless [0, 1].include? payload.get.arity

  dm = DependencyManager.new(deps: deps)
  vtimeout = timeout.is_a?(VTimeout) ? timeout : VTimeout.new(payload: timeout)

  super(payload, name, vtimeout, dm) { |payload, name, vtimeout, deps_manager|
    state_machine = StateMachine.new(name: name || "v#{number}".to_sym)
    state_machine.with_fault_processing {
      await(timeout: vtimeout.global) {
        Broker.ask Request::Update.new data: state_machine.state_data
        Log.info 'building vertex lifecycle'
        state_machine.bind_action(:new, :ready) {
          await(timeout: vtimeout.deps) {
            until deps_manager.deps.empty?
              deps_data = Broker.ask Request::GetStateData.new(deps: deps_manager.deps)
              deps_manager.deduct_deps deps_data
              sleep 0.2
            end
            deps_manager.data
          }
        }
        state_machine.bind_action(:ready, :done) {
          function = payload.get
          Log.debug "payload has arity #{function.arity}"
          await(timeout: vtimeout.payload) {
            result = case function.arity
                     when 0 then function.call
                     when 1 then function.call state_machine.state_data.data
                     else
                       raise Payload::IncorrectPayloadArity, "unexpected payload arity: #{function.arity}, supported only 0 and 1"
                     end
            Log.debug 'payload finished'
            result
          }
        }

        Log.debug "another step fades: #{state_machine.state_data}" while state_machine.step

        Log.debug 'shut down'
      }
    }
  }
end

Instance Method Details

#dataObject

obtains current Vertex-es data by lookup in the Broker’s data, available from the main Ractor only



47
48
49
# File 'lib/excadg/vertex.rb', line 47

def data
  Broker.instance.data_store&.[](self)
end

#infoObject

Returns parsed info about the Ractor: number, file, line in file, status.

Returns:

  • parsed info about the Ractor: number, file, line in file, status



18
19
20
# File 'lib/excadg/vertex.rb', line 18

def info
  inspect.scan(/^#<Ractor:#(\d+)\s(.*):(\d+)\s(\w+)>$/).first
end

#nameObject

gets current Vertex’s name, available from the main Ractor only



59
60
61
# File 'lib/excadg/vertex.rb', line 59

def name
  data&.name
end

#numberObject

Returns Ractor’s number, -1 if parsing failed.

Returns:

  • Ractor’s number, -1 if parsing failed



29
30
31
# File 'lib/excadg/vertex.rb', line 29

def number
  info&.dig(0).to_i || -1
end

#stateObject

gets current Vertex’s state, available from the main Ractor only



53
54
55
# File 'lib/excadg/vertex.rb', line 53

def state
  data&.state
end

#statusObject

Ractor’s internal status

Returns:

  • Symbol, :unknown if parsing failed



24
25
26
# File 'lib/excadg/vertex.rb', line 24

def status
  (info&.dig(3) || :unknwon).to_sym
end

#to_sObject

only main ractor could try to get real symbolic vertex name and state other ractors (e.g. logger) could only use builtin data



35
36
37
38
39
40
41
# File 'lib/excadg/vertex.rb', line 35

def to_s
  if Ractor.current == Ractor.main
    "#{name || number} #{state || status}"
  else
    "#{number} #{status}"
  end
end