Class: Task::Job
- Inherits:
-
Object
- Object
- Task::Job
- Defined in:
- lib/rbbt/util/task/job.rb
Constant Summary collapse
- IDSEP =
"_"
Instance Attribute Summary collapse
-
#id ⇒ Object
Returns the value of attribute id.
-
#input(name = nil) ⇒ Object
Returns the value of attribute input.
-
#name ⇒ Object
Returns the value of attribute name.
-
#options ⇒ Object
Returns the value of attribute options.
-
#path ⇒ Object
Returns the value of attribute path.
-
#pid ⇒ Object
Returns the value of attribute pid.
-
#previous_jobs ⇒ Object
Returns the value of attribute previous_jobs.
-
#required_files ⇒ Object
Returns the value of attribute required_files.
-
#task ⇒ Object
Returns the value of attribute task.
Class Method Summary collapse
Instance Method Summary collapse
- #abort ⇒ Object
- #aborted? ⇒ Boolean
- #all_inputs ⇒ Object
- #arguments ⇒ Object
- #block ⇒ Object
- #clean ⇒ Object
- #done? ⇒ Boolean
- #error? ⇒ Boolean
- #files(file = nil, data = nil) ⇒ Object
- #fork ⇒ Object
- #info ⇒ Object
- #info_file ⇒ Object
-
#initialize(task, id, name, options = nil, previous_jobs = nil, required_files = nil, input = nil) ⇒ Job
constructor
A new instance of Job.
- #join ⇒ Object
- #load(*args) ⇒ Object
- #load_dependencies ⇒ Object
- #messages ⇒ Object
- #open ⇒ Object
- #previous_jobs_rec ⇒ Object
- #read ⇒ Object
- #recursive_clean ⇒ Object
- #recursive_done? ⇒ Boolean
- #run ⇒ Object
- #run_dependencies ⇒ Object
- #save_dependencies ⇒ Object
- #save_options(options) ⇒ Object
- #set_info(key, value) ⇒ Object
- #start ⇒ Object
- #step(name = nil, message = nil) ⇒ Object
Constructor Details
#initialize(task, id, name, options = nil, previous_jobs = nil, required_files = nil, input = nil) ⇒ Job
Returns a new instance of Job.
20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/rbbt/util/task/job.rb', line 20 def initialize(task, id, name, = nil, previous_jobs = nil, required_files = nil, input = nil) @task = task @id =id @name = name @options = || {} @previous_jobs = previous_jobs || [] @required_files = required_files || [] @input = input basedir = task.workflow.jobdir unless task.workflow.nil? @path = File.join(basedir || Task.basedir, task.name, id) end |
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def id @id end |
#input(name = nil) ⇒ Object
Returns the value of attribute input.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def input @input end |
#name ⇒ Object
Returns the value of attribute name.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def name @name end |
#options ⇒ Object
Returns the value of attribute options.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def @options end |
#path ⇒ Object
Returns the value of attribute path.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def path @path end |
#pid ⇒ Object
Returns the value of attribute pid.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def pid @pid end |
#previous_jobs ⇒ Object
Returns the value of attribute previous_jobs.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def previous_jobs @previous_jobs end |
#required_files ⇒ Object
Returns the value of attribute required_files.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def required_files @required_files end |
#task ⇒ Object
Returns the value of attribute task.
5 6 7 |
# File 'lib/rbbt/util/task/job.rb', line 5 def task @task end |
Class Method Details
.id2name(job_id) ⇒ Object
9 10 11 |
# File 'lib/rbbt/util/task/job.rb', line 9 def self.id2name(job_id) job_id.split(IDSEP) end |
.load(task, id) ⇒ Object
13 14 15 16 17 18 |
# File 'lib/rbbt/util/task/job.rb', line 13 def self.load(task, id) name, hash = id2name(id) job = self.new task, id, name, nil, nil job.load_dependencies job end |
Instance Method Details
#abort ⇒ Object
122 123 124 125 126 |
# File 'lib/rbbt/util/task/job.rb', line 122 def abort if @pid Process.kill("INT", @pid) end end |
#aborted? ⇒ Boolean
136 137 138 |
# File 'lib/rbbt/util/task/job.rb', line 136 def aborted? step == :aborted end |
#all_inputs ⇒ Object
44 45 46 47 48 49 50 51 52 53 |
# File 'lib/rbbt/util/task/job.rb', line 44 def all_inputs if true or not defined? @all_inputs @all_inputs = {} previous_jobs_rec.each do |job| @all_inputs[job.task.name] = job end @all_inputs.extend IndiferentHash @all_inputs else @all_inputs end end |
#arguments ⇒ Object
140 141 142 |
# File 'lib/rbbt/util/task/job.rb', line 140 def arguments .values_at *task. end |
#block ⇒ Object
144 145 146 |
# File 'lib/rbbt/util/task/job.rb', line 144 def block task.block end |
#clean ⇒ Object
295 296 297 298 299 300 |
# File 'lib/rbbt/util/task/job.rb', line 295 def clean FileUtils.rm path if File.exist? path FileUtils.rm info_file if File.exist? info_file FileUtils.rm_rf path + '.files' if File.exist? path + '.files' self end |
#done? ⇒ Boolean
128 129 130 |
# File 'lib/rbbt/util/task/job.rb', line 128 def done? [:done, :error, :aborted].include? info[:step] end |
#error? ⇒ Boolean
132 133 134 |
# File 'lib/rbbt/util/task/job.rb', line 132 def error? step == :error or step == :aborted end |
#files(file = nil, data = nil) ⇒ Object
111 112 113 114 115 116 117 118 119 120 |
# File 'lib/rbbt/util/task/job.rb', line 111 def files(file = nil, data = nil) return Dir.glob(File.join(path + '.files/*')).collect{|f| File.basename(f)} if file.nil? filename = Resource::Path.path(File.join(path + '.files', file.to_s)) if data.nil? filename else Open.write(filename, data) end end |
#fork ⇒ Object
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/rbbt/util/task/job.rb', line 237 def fork return self if recursive_done? @pid = Process.fork do begin step(:started) start step(:done) rescue Exception Log.debug $!. Log.debug $!.backtrace * "\n" step(:error, "#{$!.class}: #{$!.}") end exit end self end |
#info ⇒ Object
79 80 81 82 83 |
# File 'lib/rbbt/util/task/job.rb', line 79 def info return {} if not File.exist?(info_file) info = YAML.load(File.open(info_file)) || {} info.extend IndiferentHash end |
#info_file ⇒ Object
75 76 77 |
# File 'lib/rbbt/util/task/job.rb', line 75 def info_file path + '.info' end |
#join ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/rbbt/util/task/job.rb', line 255 def join if @pid.nil? while not done? do Log.debug "Waiting: #{info[:step]}" sleep 5 end else Process.waitpid @pid end self end |
#load(*args) ⇒ Object
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/rbbt/util/task/job.rb', line 276 def load(*args) case task.persistence when :float Open.read(path).to_f when :integer Open.read(path).to_i when :string Open.read(path) when :tsv TSV.new(path, *args) when :marshal Marshal.load(Open.read(path)) when :yaml YAML.load(Open.read(path)) when nil nil end end |
#load_dependencies ⇒ Object
164 165 166 167 168 169 170 |
# File 'lib/rbbt/util/task/job.rb', line 164 def load_dependencies @previous_jobs = info[:previous_jobs].collect do |job_string| job_string =~ /JOB:(.*)\/(.*)/ task.workflow.load_job($1, $2) end if info[:previous_jobs] @required_files = info[:required_files] if info[:required_files] end |
#messages ⇒ Object
107 108 109 |
# File 'lib/rbbt/util/task/job.rb', line 107 def info[:messages] || [] end |
#open ⇒ Object
268 269 270 |
# File 'lib/rbbt/util/task/job.rb', line 268 def open File.open(path) end |
#previous_jobs_rec ⇒ Object
33 34 35 36 37 |
# File 'lib/rbbt/util/task/job.rb', line 33 def previous_jobs_rec return [] if previous_jobs.nil? prev = previous_jobs + previous_jobs.collect{|job| job.previous_jobs_rec}.flatten NamedArray.name prev, prev.collect{|job| job.task.name} end |
#read ⇒ Object
272 273 274 |
# File 'lib/rbbt/util/task/job.rb', line 272 def read File.open(path) do |f| f.read end end |
#recursive_clean ⇒ Object
302 303 304 305 |
# File 'lib/rbbt/util/task/job.rb', line 302 def recursive_clean previous_jobs.each do |job| job.recursive_clean end unless previous_jobs.nil? clean end |
#recursive_done? ⇒ Boolean
218 219 220 |
# File 'lib/rbbt/util/task/job.rb', line 218 def recursive_done? (previous_jobs || []).inject(true){|acc,j| acc and j.recursive_done?} and done? and not error? end |
#run ⇒ Object
222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'lib/rbbt/util/task/job.rb', line 222 def run return self if recursive_done? begin FileUtils.rm info_file if File.exist? info_file step(:started) start step(:done) rescue Exception Log.debug $!. Log.debug $!.backtrace * "\n" step(:error, "#{$!.class}: #{$!.}") end self end |
#run_dependencies ⇒ Object
148 149 150 151 152 153 154 155 156 157 |
# File 'lib/rbbt/util/task/job.rb', line 148 def run_dependencies required_files.each do |file| file.produce unless File.exist? file end unless required_files.nil? previous_jobs.each do |job| if not job.recursive_done? job.clean if job.error? job.start job.step :done unless job.step == :error or job.step == :aborted end end unless previous_jobs.nil? end |
#save_dependencies ⇒ Object
159 160 161 162 |
# File 'lib/rbbt/util/task/job.rb', line 159 def save_dependencies set_info :previous_jobs, @previous_jobs.collect{|job| "JOB:#{job.task.name}/#{job.id}"} unless @previous_jobs.nil? set_info :required_files, @required_files.collect{|file| file.responds_to? :find ? file.find : file} if @required_files.nil? end |
#save_options(options) ⇒ Object
205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/rbbt/util/task/job.rb', line 205 def () = {} .each do |key, value| case when TSV === value [key] = value.to_s else [key] = value end end set_info(:options, ) end |
#set_info(key, value) ⇒ Object
85 86 87 88 89 90 |
# File 'lib/rbbt/util/task/job.rb', line 85 def set_info(key, value) Misc.lock(info_file, key, value) do |info_file, key, value| i = self.info new_info = i.merge(key => value) Open.write(info_file, new_info.to_yaml) end end |
#start ⇒ Object
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/rbbt/util/task/job.rb', line 172 def start begin run_dependencies Log.medium("[#{task.name}] Starting Job '#{ name }'. Path: '#{ path }'") set_info(:start_time, Time.now) () save_dependencies extend task.scope unless task.scope.nil? or Object == task.scope.class result = instance_exec *arguments, &block if not result.nil? case task.persistence when nil, :string, :tsv, :integer Open.write(path, result.to_s) when :marshal Open.write(path, Marshal.dump(result)) when :yaml Open.write(path, YAML.dump(result)) end end set_info(:end_time, Time.now) Log.medium("[#{task.name}] Finished Job '#{ name }'. Path: '#{ path }'") rescue Exception set_info(:exception_backtrace, $!.backtrace) step(:error, "#{$!.class}: #{$!.}") raise $! end end |
#step(name = nil, message = nil) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/rbbt/util/task/job.rb', line 92 def step(name = nil, = nil) @previous_jobs if name.nil? info[:step] else set_info(:step, name) if .nil? Log.info "[#{task.name}] Step '#{name}'" else Log.info "[#{task.name}] Step '#{name}': #{.chomp}" set_info(:messages, info[:messages] || [] << ) if not .nil? end end end |