Class: Rukawa::JobNet
Instance Attribute Summary collapse
Attributes inherited from AbstractJob
#parent_job_net
Class Method Summary
collapse
Instance Method Summary
collapse
Methods inherited from AbstractJob
add_skip_rule, description, #elapsed_time_from, #formatted_elapsed_time_from, #inspect, #name, set_description, #skip?
Constructor Details
#initialize(variables: {}, context: Context.new, parent_job_net: nil, resume_job_classes: []) ⇒ JobNet
Returns a new instance of JobNet.
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/rukawa/job_net.rb', line 14
def initialize(variables: {}, context: Context.new, parent_job_net: nil, resume_job_classes: [])
@parent_job_net = parent_job_net
@variables = variables
@context = context
@dag = Dag.new
@dag.build(self, variables, context, self.class.dependencies)
@resume_job_classes = resume_job_classes
unless resume_job_classes.empty?
resume_targets = []
@dag.tsort_each_node do |node|
node.set_state(:bypassed)
resume_targets << node if resume_job_classes.include?(node.class)
end
resume_targets.each do |node|
@dag.each_strongly_connected_component_from(node) do |nodes|
nodes.each { |connected| connected.set_state(:waiting) }
end
end
end
end
|
Instance Attribute Details
#context ⇒ Object
Returns the value of attribute context.
6
7
8
|
# File 'lib/rukawa/job_net.rb', line 6
def context
@context
end
|
#dag ⇒ Object
Returns the value of attribute dag.
6
7
8
|
# File 'lib/rukawa/job_net.rb', line 6
def dag
@dag
end
|
#variables ⇒ Object
Returns the value of attribute variables.
6
7
8
|
# File 'lib/rukawa/job_net.rb', line 6
def variables
@variables
end
|
Class Method Details
.dependencies ⇒ Object
9
10
11
|
# File 'lib/rukawa/job_net.rb', line 9
def dependencies
raise NotImplementedError, "Please override"
end
|
Instance Method Details
#dataflows ⇒ Object
78
79
80
|
# File 'lib/rukawa/job_net.rb', line 78
def dataflows
@dag.leveled_each.map(&:dataflow)
end
|
#each(&block) ⇒ Object
127
128
129
|
# File 'lib/rukawa/job_net.rb', line 127
def each(&block)
@dag.each(&block)
end
|
#execute ⇒ Object
37
38
39
|
# File 'lib/rukawa/job_net.rb', line 37
def execute
dataflows.each(&:execute)
end
|
#finished_at ⇒ Object
66
67
68
|
# File 'lib/rukawa/job_net.rb', line 66
def finished_at
@dag.nodes.max_by { |j| j.finished_at.to_i }.finished_at
end
|
#jobs_as_from ⇒ Object
123
124
125
|
# File 'lib/rukawa/job_net.rb', line 123
def jobs_as_from
@dag.jobs.select { |j| j.out_goings.select { |edge| edge.cluster == self }.empty? && j.leaf? }
end
|
#jobs_as_to ⇒ Object
119
120
121
|
# File 'lib/rukawa/job_net.rb', line 119
def jobs_as_to
@dag.jobs.select { |j| j.in_comings.select { |edge| edge.cluster == self }.empty? && j.root? }
end
|
#output_dot(filename, format: nil) ⇒ Object
88
89
90
91
92
93
94
95
96
|
# File 'lib/rukawa/job_net.rb', line 88
def output_dot(filename, format: nil)
if format && format != "dot"
io = IO.popen(["#{Rukawa.config.dot_command}", "-T#{format}", "-o", filename], "w")
io.write(to_dot)
io.close
else
File.open(filename, 'w') { |f| f.write(to_dot) }
end
end
|
#run(wait_interval = 1) ⇒ Object
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
# File 'lib/rukawa/job_net.rb', line 41
def run(wait_interval = 1)
promise = Concurrent::Promise.new do
futures = execute
until futures.all?(&:complete?)
yield self if block_given?
sleep wait_interval
end
errors = futures.map(&:reason).compact
unless errors.empty?
errors.each do |err|
next if err.is_a?(DependencyUnsatisfied)
Rukawa.logger.error(err)
end
end
futures
end
promise.execute
end
|
#started_at ⇒ Object
62
63
64
|
# File 'lib/rukawa/job_net.rb', line 62
def started_at
@dag.nodes.min_by { |j| j.started_at ? j.started_at.to_i : Float::INFINITY }.started_at
end
|
#state ⇒ Object
82
83
84
85
86
|
# File 'lib/rukawa/job_net.rb', line 82
def state
inject(Rukawa::State::Waiting) do |state, j|
state.merge(j.state)
end
end
|
#subgraph? ⇒ Boolean
74
75
76
|
# File 'lib/rukawa/job_net.rb', line 74
def subgraph?
!toplevel?
end
|
#to_dot(subgraph = false) ⇒ Object
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/rukawa/job_net.rb', line 98
def to_dot(subgraph = false)
graphdef = subgraph ? "subgraph" : "digraph"
buf = %Q|#{graphdef} "#{subgraph ? "cluster_" : ""}#{name}" {\n|
buf += %Q{label = "#{graph_label}";\n}
buf += Rukawa.config.graph.attrs
buf += Rukawa.config.graph.node.attrs
buf += "color = blue;\n" if subgraph
dag.each do |j|
buf += j.to_dot_def
end
dag.edges.each do |edge|
buf += %Q|"#{edge.from.name}" -> "#{edge.to.name}";\n|
end
buf += "}\n"
end
|
#to_dot_def ⇒ Object
115
116
117
|
# File 'lib/rukawa/job_net.rb', line 115
def to_dot_def
to_dot(true)
end
|
#toplevel? ⇒ Boolean
70
71
72
|
# File 'lib/rukawa/job_net.rb', line 70
def toplevel?
@parent_job_net.nil?
end
|