Class: Rukawa::Job
- Inherits:
-
AbstractJob
show all
- Includes:
- ActiveSupport::Callbacks
- Defined in:
- lib/rukawa/job.rb
Instance Attribute Summary collapse
Attributes inherited from AbstractJob
#parent_job_net
Class Method Summary
collapse
-
.after_fail(*args, **options, &block) ⇒ Object
-
.after_run(*args, **options, &block) ⇒ Object
-
.around_run(*args, **options, &block) ⇒ Object
-
.before_run(*args, **options, &block) ⇒ Object
-
.set_dependency_type(name) ⇒ Object
-
.set_resource_count(count) ⇒ Object
-
.set_retryable(limit: 8, type: nil, wait: nil) ⇒ Object
-
.wrapper_for(*classes) ⇒ Object
-
.wrappers ⇒ Object
Instance Method Summary
collapse
Methods inherited from AbstractJob
add_skip_rule, description, #elapsed_time_from, #formatted_elapsed_time_from, #inspect, #name, set_description, #skip?
Constructor Details
#initialize(variables: {}, context: Context.new, parent_job_net: nil) ⇒ Job
Returns a new instance of Job.
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/rukawa/job.rb', line 98
def initialize(variables: {}, context: Context.new, parent_job_net: nil)
@parent_job_net = parent_job_net
@variables = variables
@context = context
@in_comings = Set.new
@out_goings = Set.new
@retry_count = 0
@retry_wait = 1
set_state(:waiting)
end
|
Instance Attribute Details
#finished_at ⇒ Object
Returns the value of attribute finished_at.
24
25
26
|
# File 'lib/rukawa/job.rb', line 24
def finished_at
@finished_at
end
|
#in_comings ⇒ Object
Returns the value of attribute in_comings.
23
24
25
|
# File 'lib/rukawa/job.rb', line 23
def in_comings
@in_comings
end
|
#out_goings ⇒ Object
Returns the value of attribute out_goings.
23
24
25
|
# File 'lib/rukawa/job.rb', line 23
def out_goings
@out_goings
end
|
#started_at ⇒ Object
Returns the value of attribute started_at.
24
25
26
|
# File 'lib/rukawa/job.rb', line 24
def started_at
@started_at
end
|
#state ⇒ Object
Returns the value of attribute state.
24
25
26
|
# File 'lib/rukawa/job.rb', line 24
def state
@state
end
|
#variables ⇒ Object
Returns the value of attribute variables.
24
25
26
|
# File 'lib/rukawa/job.rb', line 24
def variables
@variables
end
|
Class Method Details
.after_fail(*args, **options, &block) ⇒ Object
61
62
63
64
65
66
67
68
|
# File 'lib/rukawa/job.rb', line 61
def after_fail(*args, **options, &block)
options[:prepend] = true
conditional = ActiveSupport::Callbacks::Conditionals::Value.new { |v|
v != false
}
options[:if] = Array(options[:if]) << conditional
set_callback :fail, :after, *args, **options, &block
end
|
.after_run(*args, **options, &block) ⇒ Object
52
53
54
55
56
57
58
59
|
# File 'lib/rukawa/job.rb', line 52
def after_run(*args, **options, &block)
options[:prepend] = true
conditional = ActiveSupport::Callbacks::Conditionals::Value.new { |v|
v != false
}
options[:if] = Array(options[:if]) << conditional
set_callback :run, :after, *args, **options, &block
end
|
.around_run(*args, **options, &block) ⇒ Object
70
71
72
|
# File 'lib/rukawa/job.rb', line 70
def around_run(*args, **options, &block)
set_callback :run, :around, *args, **options, &block
end
|
.before_run(*args, **options, &block) ⇒ Object
48
49
50
|
# File 'lib/rukawa/job.rb', line 48
def before_run(*args, **options, &block)
set_callback :run, :before, *args, **options, &block
end
|
.set_dependency_type(name) ⇒ Object
40
41
42
|
# File 'lib/rukawa/job.rb', line 40
def set_dependency_type(name)
self.dependency_type = Rukawa::Dependency.get(name)
end
|
.set_resource_count(count) ⇒ Object
44
45
46
|
# File 'lib/rukawa/job.rb', line 44
def set_resource_count(count)
self.resource_count = count
end
|
.set_retryable(limit: 8, type: nil, wait: nil) ⇒ Object
33
34
35
36
37
38
|
# File 'lib/rukawa/job.rb', line 33
def set_retryable(limit: 8, type: nil, wait: nil)
self.retryable = true
self.retry_limit = limit
self.retry_exception_type = type
self.retry_wait = wait
end
|
.wrapper_for(*classes) ⇒ Object
78
79
80
81
82
83
|
# File 'lib/rukawa/job.rb', line 78
def wrapper_for(*classes)
classes.each do |c|
raise "Wrapper for #{c} is already defined" if wrappers[c]
wrappers[c] = self
end
end
|
.wrappers ⇒ Object
74
75
76
|
# File 'lib/rukawa/job.rb', line 74
def wrappers
@@wrappers ||= {}
end
|
Instance Method Details
#dataflow ⇒ Object
121
122
123
124
125
126
127
128
129
130
131
|
# File 'lib/rukawa/job.rb', line 121
def dataflow
return @dataflow if @dataflow
return @dataflow = bypass_dataflow if @state.bypassed?
@dataflow = Concurrent.dataflow_with(@context.executor, *depend_dataflows) do |*results|
acquire_resource do
do_run(*results)
@state
end
end
end
|
#jobs_as_from ⇒ Object
Also known as:
jobs_as_to
157
158
159
|
# File 'lib/rukawa/job.rb', line 157
def jobs_as_from
[self]
end
|
#leaf? ⇒ Boolean
117
118
119
|
# File 'lib/rukawa/job.rb', line 117
def leaf?
out_goings.select { |edge| edge.cluster == @parent_job_net }.empty?
end
|
#resource_count ⇒ Object
170
171
172
|
# File 'lib/rukawa/job.rb', line 170
def resource_count
[self.class.resource_count, Rukawa.config.concurrency].min
end
|
#root? ⇒ Boolean
113
114
115
|
# File 'lib/rukawa/job.rb', line 113
def root?
in_comings.select { |edge| edge.cluster == @parent_job_net }.empty?
end
|
#run ⇒ Object
133
134
|
# File 'lib/rukawa/job.rb', line 133
def run
end
|
#set_state(name) ⇒ Object
109
110
111
|
# File 'lib/rukawa/job.rb', line 109
def set_state(name)
@state = Rukawa::State.get(name)
end
|
#to_dot_def ⇒ Object
162
163
164
165
166
167
168
|
# File 'lib/rukawa/job.rb', line 162
def to_dot_def
if state == Rukawa::State::Waiting
"\"#{name}\";\n"
else
"\"#{name}\" [style = filled,fillcolor = #{state.color}];\n"
end
end
|