Class: Rukawa::Job

Inherits:
AbstractJob show all
Includes:
ActiveSupport::Callbacks
Defined in:
lib/rukawa/job.rb

Direct Known Subclasses

Builtins::Base

Instance Attribute Summary collapse

Attributes inherited from AbstractJob

#parent_job_net

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from AbstractJob

add_skip_rule, description, #elapsed_time_from, #formatted_elapsed_time_from, #inspect, #name, set_description, #skip?

Constructor Details

#initialize(variables: {}, context: Context.new, parent_job_net: nil) ⇒ Job

Returns a new instance of Job.



98
99
100
101
102
103
104
105
106
107
# File 'lib/rukawa/job.rb', line 98

def initialize(variables: {}, context: Context.new, parent_job_net: nil)
  @parent_job_net = parent_job_net
  @variables = variables
  @context = context
  @in_comings = Set.new
  @out_goings = Set.new
  @retry_count = 0
  @retry_wait = 1
  set_state(:waiting)
end

Instance Attribute Details

#finished_atObject (readonly)

Returns the value of attribute finished_at.



24
25
26
# File 'lib/rukawa/job.rb', line 24

def finished_at
  @finished_at
end

#in_comingsObject

Returns the value of attribute in_comings.



23
24
25
# File 'lib/rukawa/job.rb', line 23

def in_comings
  @in_comings
end

#out_goingsObject

Returns the value of attribute out_goings.



23
24
25
# File 'lib/rukawa/job.rb', line 23

def out_goings
  @out_goings
end

#started_atObject (readonly)

Returns the value of attribute started_at.



24
25
26
# File 'lib/rukawa/job.rb', line 24

def started_at
  @started_at
end

#stateObject (readonly)

Returns the value of attribute state.



24
25
26
# File 'lib/rukawa/job.rb', line 24

def state
  @state
end

#variablesObject (readonly)

Returns the value of attribute variables.



24
25
26
# File 'lib/rukawa/job.rb', line 24

def variables
  @variables
end

Class Method Details

.after_fail(*args, **options, &block) ⇒ Object



61
62
63
64
65
66
67
68
# File 'lib/rukawa/job.rb', line 61

def after_fail(*args, **options, &block)
  options[:prepend] = true
  conditional = ActiveSupport::Callbacks::Conditionals::Value.new { |v|
    v != false
  }
  options[:if] = Array(options[:if]) << conditional
  set_callback :fail, :after, *args, **options, &block
end

.after_run(*args, **options, &block) ⇒ Object



52
53
54
55
56
57
58
59
# File 'lib/rukawa/job.rb', line 52

def after_run(*args, **options, &block)
  options[:prepend] = true
  conditional = ActiveSupport::Callbacks::Conditionals::Value.new { |v|
    v != false
  }
  options[:if] = Array(options[:if]) << conditional
  set_callback :run, :after, *args, **options, &block
end

.around_run(*args, **options, &block) ⇒ Object



70
71
72
# File 'lib/rukawa/job.rb', line 70

def around_run(*args, **options, &block)
  set_callback :run, :around, *args, **options, &block
end

.before_run(*args, **options, &block) ⇒ Object



48
49
50
# File 'lib/rukawa/job.rb', line 48

def before_run(*args, **options, &block)
  set_callback :run, :before, *args, **options, &block
end

.set_dependency_type(name) ⇒ Object



40
41
42
# File 'lib/rukawa/job.rb', line 40

def set_dependency_type(name)
  self.dependency_type = Rukawa::Dependency.get(name)
end

.set_resource_count(count) ⇒ Object



44
45
46
# File 'lib/rukawa/job.rb', line 44

def set_resource_count(count)
  self.resource_count = count
end

.set_retryable(limit: 8, type: nil, wait: nil) ⇒ Object



33
34
35
36
37
38
# File 'lib/rukawa/job.rb', line 33

def set_retryable(limit: 8, type: nil, wait: nil)
  self.retryable = true
  self.retry_limit = limit
  self.retry_exception_type = type
  self.retry_wait = wait
end

.wrapper_for(*classes) ⇒ Object



78
79
80
81
82
83
# File 'lib/rukawa/job.rb', line 78

def wrapper_for(*classes)
  classes.each do |c|
    raise "Wrapper for #{c} is already defined" if wrappers[c]
    wrappers[c] = self
  end
end

.wrappersObject



74
75
76
# File 'lib/rukawa/job.rb', line 74

def wrappers
  @@wrappers ||= {}
end

Instance Method Details

#dataflowObject



121
122
123
124
125
126
127
128
129
130
131
# File 'lib/rukawa/job.rb', line 121

def dataflow
  return @dataflow if @dataflow
  return @dataflow = bypass_dataflow if @state.bypassed?

  @dataflow = Concurrent.dataflow_with(@context.executor, *depend_dataflows) do |*results|
    acquire_resource do
      do_run(*results)
      @state
    end
  end
end

#jobs_as_fromObject Also known as: jobs_as_to



157
158
159
# File 'lib/rukawa/job.rb', line 157

def jobs_as_from
  [self]
end

#leaf?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/rukawa/job.rb', line 117

def leaf?
  out_goings.select { |edge| edge.cluster == @parent_job_net }.empty?
end

#resource_countObject



170
171
172
# File 'lib/rukawa/job.rb', line 170

def resource_count
  [self.class.resource_count, Rukawa.config.concurrency].min
end

#root?Boolean

Returns:

  • (Boolean)


113
114
115
# File 'lib/rukawa/job.rb', line 113

def root?
  in_comings.select { |edge| edge.cluster == @parent_job_net }.empty?
end

#runObject



133
134
# File 'lib/rukawa/job.rb', line 133

def run
end

#set_state(name) ⇒ Object



109
110
111
# File 'lib/rukawa/job.rb', line 109

def set_state(name)
  @state = Rukawa::State.get(name)
end

#to_dot_defObject



162
163
164
165
166
167
168
# File 'lib/rukawa/job.rb', line 162

def to_dot_def
  if state == Rukawa::State::Waiting
    "\"#{name}\";\n"
  else
    "\"#{name}\" [style = filled,fillcolor = #{state.color}];\n"
  end
end