Class: Step
- Inherits:
-
Object
- Object
- Step
- Defined in:
- lib/scout/workflow/step.rb,
lib/scout/workflow/step/file.rb,
lib/scout/workflow/step/info.rb,
lib/scout/workflow/step/load.rb,
lib/scout/workflow/step/config.rb,
lib/scout/workflow/step/inputs.rb,
lib/scout/workflow/step/status.rb,
lib/scout/workflow/step/archive.rb,
lib/scout/workflow/step/children.rb,
lib/scout/workflow/step/progress.rb,
lib/scout/workflow/step/provenance.rb,
lib/scout/workflow/step/dependencies.rb
Constant Summary collapse
- SERIALIZER =
:marshal
Instance Attribute Summary collapse
-
#compute ⇒ Object
Returns the value of attribute compute.
-
#dependencies ⇒ Object
Returns the value of attribute dependencies.
-
#exec_context ⇒ Object
Returns the value of attribute exec_context.
-
#id ⇒ Object
Returns the value of attribute id.
-
#inputs ⇒ Object
Returns the value of attribute inputs.
-
#non_default_inputs ⇒ Object
Returns the value of attribute non_default_inputs.
-
#overriden ⇒ Object
Returns the value of attribute overriden.
-
#overriden_task ⇒ Object
Returns the value of attribute overriden_task.
-
#overriden_workflow ⇒ Object
Returns the value of attribute overriden_workflow.
-
#path ⇒ Object
Returns the value of attribute path.
-
#provided_inputs ⇒ Object
Returns the value of attribute provided_inputs.
-
#result ⇒ Object
readonly
Returns the value of attribute result.
-
#task ⇒ Object
Returns the value of attribute task.
-
#tee_copies ⇒ Object
Returns the value of attribute tee_copies.
-
#type ⇒ Object
Returns the value of attribute type.
-
#workflow ⇒ Object
Returns the value of attribute workflow.
Class Method Summary collapse
- ._load(path) ⇒ Object
- .clean(file) ⇒ Object
- .job_path?(path) ⇒ Boolean
- .load(path) ⇒ Object
- .load_info(info_file) ⇒ Object
- .prov_indent(step, offset, input_dependencies) ⇒ Object
- .prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil, input_dependencies = nil) ⇒ Object
- .prov_report_msg(status, name, path, info, input = nil) ⇒ Object
- .prov_status_msg(status) ⇒ Object
- .relocate(path) ⇒ Object
- .status_color(status) ⇒ Object
- .wait_for_jobs(jobs) ⇒ Object
Instance Method Summary collapse
-
#_dump(level) ⇒ Object
Marshal Step.
- #abort(exception = nil) ⇒ Object
- #abort_dependencies ⇒ Object
- #aborted? ⇒ Boolean
- #alias? ⇒ Boolean
- #all_dependencies ⇒ Object
- #archive_deps(jobs = nil) ⇒ Object
- #archived_info ⇒ Object
- #archived_inputs ⇒ Object
- #bundle_files ⇒ Object
- #canfail? ⇒ Boolean
- #child(&block) ⇒ Object
- #clean ⇒ Object
- #clean_name ⇒ Object
- #cmd(*args) ⇒ Object
- #config(key, *tokens) ⇒ Object
- #consume_all_streams ⇒ Object
- #copy_linked_files_dir ⇒ Object
- #digest_str ⇒ Object
- #dirty? ⇒ Boolean
- #done? ⇒ Boolean
- #error? ⇒ Boolean
- #exception ⇒ Object
- #exec ⇒ Object
- #file(file = nil) ⇒ Object
- #files ⇒ Object
- #files_dir ⇒ Object
- #fingerprint ⇒ Object
- #fork(noload = false, semaphore = nil) ⇒ Object
- #full_task_name ⇒ Object
- #grace ⇒ Object
- #info ⇒ Object
- #info_file ⇒ Object
- #init_info ⇒ Object
-
#initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context = nil, &task) ⇒ Step
constructor
A new instance of Step.
- #input_dependencies ⇒ Object
- #join ⇒ Object
- #load ⇒ Object
- #load_info ⇒ Object
- #log(status, message = nil, &block) ⇒ Object
- #marshal_load(path) ⇒ Object
- #merge_info(new_info) ⇒ Object
- #messages ⇒ Object
- #name ⇒ Object
- #overriden? ⇒ Boolean
- #overriden_deps ⇒ Object
- #prepare_dependencies ⇒ Object
- #present? ⇒ Boolean
- #produce(with_fork: false) ⇒ Object
- #progress_bar(msg = "Progress", options = nil, &block) ⇒ Object
- #rec_dependencies(connected = false, seen = []) ⇒ Object
- #recoverable_error? ⇒ Boolean
- #recursive_clean ⇒ Object
- #recursive_inputs ⇒ Object
- #report_status(status, message = nil) ⇒ Object
- #reset_info(info = {}) ⇒ Object
- #run(stream = false) ⇒ Object
- #run_dependencies ⇒ Object
- #running? ⇒ Boolean
- #save_info(info = nil) ⇒ Object
- #save_inputs(inputs_dir) ⇒ Object
- #set_info(key, value) ⇒ Object
- #short_path ⇒ Object
- #started? ⇒ Boolean
- #status ⇒ Object
- #step(task_name) ⇒ Object
- #stream ⇒ Object
- #streaming? ⇒ Boolean
- #synchronize(&block) ⇒ Object
- #task_name ⇒ Object
- #task_signature ⇒ Object
- #terminated? ⇒ Boolean
- #tmp_path ⇒ Object
- #updated? ⇒ Boolean
- #waiting? ⇒ Boolean
Constructor Details
#initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context = nil, &task) ⇒ Step
Returns a new instance of Step.
19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/scout/workflow/step.rb', line 19 def initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context = nil, &task) @path = path @inputs = inputs @dependencies = dependencies @id = id @non_default_inputs = non_default_inputs @provided_inputs = provided_inputs @compute = compute @task = task @mutex = Mutex.new @tee_copies = 1 @exec_context = exec_context || self end |
Instance Attribute Details
#compute ⇒ Object
Returns the value of attribute compute.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def compute @compute end |
#dependencies ⇒ Object
Returns the value of attribute dependencies.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def dependencies @dependencies end |
#exec_context ⇒ Object
Returns the value of attribute exec_context.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def exec_context @exec_context end |
#id ⇒ Object
Returns the value of attribute id.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def id @id end |
#inputs ⇒ Object
Returns the value of attribute inputs.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def inputs @inputs end |
#non_default_inputs ⇒ Object
Returns the value of attribute non_default_inputs.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def non_default_inputs @non_default_inputs end |
#overriden ⇒ Object
Returns the value of attribute overriden.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def overriden @overriden end |
#overriden_task ⇒ Object
Returns the value of attribute overriden_task.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def overriden_task @overriden_task end |
#overriden_workflow ⇒ Object
Returns the value of attribute overriden_workflow.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def overriden_workflow @overriden_workflow end |
#path ⇒ Object
Returns the value of attribute path.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def path @path end |
#provided_inputs ⇒ Object
Returns the value of attribute provided_inputs.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def provided_inputs @provided_inputs end |
#result ⇒ Object (readonly)
Returns the value of attribute result.
139 140 141 |
# File 'lib/scout/workflow/step.rb', line 139 def result @result end |
#task ⇒ Object
Returns the value of attribute task.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def task @task end |
#tee_copies ⇒ Object
Returns the value of attribute tee_copies.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def tee_copies @tee_copies end |
#type ⇒ Object
Returns the value of attribute type.
62 63 64 |
# File 'lib/scout/workflow/step.rb', line 62 def type @type end |
#workflow ⇒ Object
Returns the value of attribute workflow.
18 19 20 |
# File 'lib/scout/workflow/step.rb', line 18 def workflow @workflow end |
Class Method Details
._load(path) ⇒ Object
180 181 182 |
# File 'lib/scout/workflow/step/info.rb', line 180 def self._load(path) Step.new path end |
.clean(file) ⇒ Object
46 47 48 |
# File 'lib/scout/workflow/step/status.rb', line 46 def self.clean(file) Step.new(file).clean end |
.job_path?(path) ⇒ Boolean
2 3 4 |
# File 'lib/scout/workflow/step/provenance.rb', line 2 def self.job_path?(path) path.split("/")[-4] == "jobs" end |
.load(path) ⇒ Object
13 14 15 16 17 |
# File 'lib/scout/workflow/step/load.rb', line 13 def self.load(path) path = relocate(path) unless Open.exists?(path) #raise "Could not load #{path}" unless Open.exists?(path) s = Step.new path end |
.load_info(info_file) ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/scout/workflow/step/info.rb', line 12 def self.load_info(info_file) info = begin Persist.load(info_file, SERIALIZER) || {} rescue {status: :noinfo} end IndiferentHash.setup(info) end |
.prov_indent(step, offset, input_dependencies) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/scout/workflow/step/provenance.rb', line 89 def self.prov_indent(step, offset, input_dependencies) return (" " * (offset)) if step.alias? (" " * offset + "🡵") elsif step.overriden_task (" " * offset + "🡇") elsif input_dependencies.include?(step) (" " * offset + "┝") elsif step.input_dependencies.any? (" " * offset + "╰") else (" " * (offset+1)) end end |
.prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil, input_dependencies = nil) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/scout/workflow/step/provenance.rb', line 104 def self.prov_report(step, offset = 0, task = nil, seen = [], = false, input = nil, input_dependencies = nil) info = step.info || {} info[:task_name] = task path = step.path status = info[:status] || :missing status = status.to_sym if String === status status = :noinfo if status == :missing && Open.exist?(path) status = "remote" if Open.remote?(path) || Open.ssh?(path) name = info[:name] || File.basename(path) status = :unsync if status == :done and not Open.exist?(path) status = :notfound if status == :noinfo and not Open.exist?(path) this_step_msg = prov_report_msg(status, name, path, info, input) input_dependencies ||= {} step.dependencies.each do |dep| if dep.input_dependencies.any? dep.input_dependencies.each do |id| input_name, _dep = dep.recursive_inputs.select{|f,d| d == id || (String === d && d.start_with?(id.files_dir)) || (Array === d && d.include?(id)) }.keys.last if input_name input_dependencies[id] ||= [] input_dependencies[id] << [dep, input_name] end end end end if step.dependencies str = [] indent = prov_indent(step, offset, input_dependencies) str << indent + this_step_msg if ENV["SCOUT_ORIGINAL_STACK"] == 'true' step.dependencies.dup.tap{|l| l.reverse! if ENV["SCOUT_ORIGINAL_STACK"] == 'true' }.each do |dep| path = dep.path new = ! seen.include?(path) if new seen << path str.concat(prov_report(dep, offset + 1, task, seen, , input_dependencies[dep], input_dependencies.dup).split("\n")) else if str << Log.color(Step.status_color(dep.status), Log.uncolor(prov_report(dep, offset+1, task, input_dependencies[dep], input_dependencies.dup))) else info = dep.info || {} status = info[:status] || :missing status = "remote" if Open.remote?(path) || Open.ssh?(path) name = info[:name] || File.basename(path) status = :unsync if status == :done and not Open.exist?(path) status = :notfound if status == :noinfo and not Open.exist?(path) dep_indent = prov_indent(dep, offset+1, input_dependencies) str << dep_indent + Log.color(Step.status_color(status), Log.uncolor(prov_report_msg(status, name, path, info, input_dependencies[dep]))) end end end if step.dependencies str << indent + this_step_msg unless ENV["SCOUT_ORIGINAL_STACK"] == 'true' str * "\n" end |
.prov_report_msg(status, name, path, info, input = nil) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/scout/workflow/step/provenance.rb', line 32 def self.prov_report_msg(status, name, path, info, input = nil) parts = path.sub(/\{.*/,'').split "/" parts.pop task = Log.color(:yellow, parts.pop) workflow = Log.color(:magenta, parts.pop) if ! Step.job_path?(path) task, status, workflow = Log.color(:yellow, info[:task_name]), Log.color(:green, "file"), Log.color(:magenta, "-") end path_mtime = begin Open.mtime(path) rescue Exception nil end if input.nil? || input.empty? input_str = nil else input = input.reject{|dep,name| (input & dep.dependencies.collect{|d| [d,name]}).any? } input = input.reject{|dep,name| (input & dep.input_dependencies.collect{|d| [d,name]}).any? } input_str = Log.color(:magenta, "-> ") + input.collect{|dep,name| Log.color(:yellow, dep.task_name.to_s) + ":" + Log.color(:yellow, name) }.uniq * " " end str = if ! (Open.remote?(path) || Open.ssh?(path)) && (Open.exists?(path) && $main_mtime && path_mtime && ($main_mtime - path_mtime) < -2) prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " << " (#{Log.color(:red, "Mtime out of sync") })" else prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " end if $inputs and $inputs.any? job_inputs = Workflow.load_step(path).recursive_inputs.to_hash IndiferentHash.setup(job_inputs) $inputs.each do |input| value = job_inputs[input] next if value.nil? value_str = Log.fingerprint(value) str << "\t#{Log.color :magenta, input}=#{value_str}" end end if $info_fields and $info_fields.any? $info_fields.each do |field| IndiferentHash.setup(info) value = info[field] next if value.nil? value_str = Log.fingerprint(value) str << "\t#{Log.color :magenta, field}=#{value_str}" end end str end |
.prov_status_msg(status) ⇒ Object
27 28 29 30 |
# File 'lib/scout/workflow/step/provenance.rb', line 27 def self.prov_status_msg(status) color = status_color(status) Log.color(color, status.to_s) end |
.relocate(path) ⇒ Object
2 3 4 5 6 7 8 9 10 11 |
# File 'lib/scout/workflow/step/load.rb', line 2 def self.relocate(path) return path if Open.exists?(path) Path.setup(path) unless Path === path relocated = path.relocate return relocated if Open.exists?(relocated) subpath = path.split("/")[-3..-1] * "/" relocated = Path.setup("var/jobs")[subpath] return relocated if Open.exists?(relocated) path end |
.status_color(status) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/scout/workflow/step/provenance.rb', line 6 def self.status_color(status) case status.to_sym when :error, :aborted, :dead, :unsync :red when :streaming, :started :cyan when :done, :noinfo :green when :dependencies, :waiting, :setup :yellow when :notfound, :cleaned, :missing :blue else if status.to_s.index ">" :cyan else :cyan end end end |
.wait_for_jobs(jobs) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/scout/workflow/step/dependencies.rb', line 98 def self.wait_for_jobs(jobs) threads = [] jobs.each do |job| threads << Thread.new do Thread.current.report_on_exception = false job.join end end threads.each do |t| t.join end end |
Instance Method Details
#_dump(level) ⇒ Object
Marshal Step
176 177 178 |
# File 'lib/scout/workflow/step/info.rb', line 176 def _dump(level) @path end |
#abort(exception = nil) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/scout/workflow/step/status.rb', line 2 def abort(exception = nil) if (pid = info[:pid]) && pid != Process.pid && Misc.pid_alive?(pid) Log.debug "Kill process #{pid} to abort step #{Log.fingerprint self}" begin s = Misc.abort_child pid, true Log.medium "Aborted pid #{path} #{s}" rescue Log.debug("Aborted job #{pid} was not killed: #{$!.}") end else while @result && streaming? && stream = self.stream stream.abort(exception) end @take_stream.abort(exception) if streaming? end end |
#abort_dependencies ⇒ Object
94 95 96 |
# File 'lib/scout/workflow/step/dependencies.rb', line 94 def abort_dependencies all_dependencies.each{|dep| dep.abort if dep.running? } end |
#aborted? ⇒ Boolean
154 155 156 |
# File 'lib/scout/workflow/step/info.rb', line 154 def aborted? status == :aborted || status == 'aborted' end |
#alias? ⇒ Boolean
379 380 381 |
# File 'lib/scout/workflow/step.rb', line 379 def alias? task.alias? if task end |
#all_dependencies ⇒ Object
63 64 65 66 67 68 69 70 |
# File 'lib/scout/workflow/step/dependencies.rb', line 63 def all_dependencies @all_dependencies ||= begin all_dependencies = [] all_dependencies += dependencies if dependencies all_dependencies += input_dependencies if input_dependencies all_dependencies end end |
#archive_deps(jobs = nil) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/scout/workflow/step/archive.rb', line 29 def archive_deps(jobs = nil) jobs = dependencies if jobs.nil? archived_info = jobs.inject({}) do |acc,dep| next acc unless Open.exists?(dep.info_file) acc[dep.path] = dep.info acc.merge!(dep.archived_info) acc end self.set_info :archived_info, archived_info self.set_info :archived_dependencies, info[:dependencies] end |
#archived_info ⇒ Object
2 3 4 5 |
# File 'lib/scout/workflow/step/archive.rb', line 2 def archived_info return {} unless Open.exists?(info_file) info[:archived_info] || {} end |
#archived_inputs ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/scout/workflow/step/archive.rb', line 7 def archived_inputs return [] unless info[:archived_dependencies] archived_info = self.archived_info all_inputs = NamedArray.setup([],[]) deps = info[:archived_dependencies].dup seen = [] while path = deps.pop dep_info = archived_info[path] if Hash === dep_info dep_inputs = dep_info[:inputs] NamedArray.setup(dep_inputs, dep_info[:input_names]) all_inputs.concat(dep_inputs) deps.concat(dep_info[:dependencies].collect{|p| p.last } - seen) if dep_info[:dependencies] deps.concat(dep_info[:archived_dependencies].collect{|p| p.last } - seen) if dep_info[:archived_dependencies] end seen << path end all_inputs end |
#bundle_files ⇒ Object
28 29 30 |
# File 'lib/scout/workflow/step/file.rb', line 28 def bundle_files [path, info_file, Dir.glob(File.join(files_dir,"**/*"))].flatten.select{|f| Open.exist?(f) } end |
#canfail? ⇒ Boolean
58 59 60 |
# File 'lib/scout/workflow/step/status.rb', line 58 def canfail? @compute && @compute.include?(:canfail) end |
#child(&block) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 |
# File 'lib/scout/workflow/step/children.rb', line 2 def child(&block) child_pid = Process.fork &block children_pids = info[:children_pids] if children_pids.nil? children_pids = [child_pid] else children_pids << child_pid end set_info :children_pids, children_pids child_pid end |
#clean ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/scout/workflow/step/status.rb', line 33 def clean Log.debug "Cleaning job files: #{path}" @take_stream = nil @result = nil @info = nil @info_load_time = nil Open.rm path if Open.exist_or_link?(path) Open.rm tmp_path if Open.exist_or_link?(tmp_path) Open.rm info_file if Open.exist_or_link?(info_file) Open.rm_rf files_dir if Open.exist_or_link?(files_dir) self end |
#clean_name ⇒ Object
75 76 77 78 79 80 81 82 |
# File 'lib/scout/workflow/step.rb', line 75 def clean_name return @id if @id return info[:clean_name] if info.include? :clean_name if m = name.match(/(.*?)(?:_[a-z0-9]{32})?(?:\..*)?/) return m[1] end return name.split(".").first end |
#cmd(*args) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/scout/workflow/step/children.rb', line 14 def cmd(*args) all_args = *args all_args << {} unless Hash === all_args.last level = all_args.last[:log] || 0 level = 0 if TrueClass === level level = 10 if FalseClass === level level = level.to_i all_args.last[:log] = true all_args.last[:pipe] = true io = CMD.cmd(*all_args) child_pid = io.pids.first children_pids = info[:children_pids] if children_pids.nil? children_pids = [child_pid] else children_pids << child_pid end set_info :children_pids, children_pids while c = io.getc STDERR << c if Log.severity <= level if c == "\n" Log.logn "STDOUT [#{child_pid}]: ", level end end io.join nil end |
#config(key, *tokens) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/scout/workflow/step/config.rb', line 4 def config(key, *tokens) = tokens.pop if Hash === tokens.last ||= {} new_tokens = [] if workflow workflow_name = workflow.name new_tokens << ("workflow:" << workflow_name) new_tokens << ("task:" << workflow_name << "#" << task_name.to_s) end new_tokens << ("task:" << task_name.to_s) Scout::Config.get(key, tokens + new_tokens, ) end |
#consume_all_streams ⇒ Object
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
# File 'lib/scout/workflow/step.rb', line 284 def consume_all_streams threads = [] while @result && streaming? && stream = self.stream threads << Open.consume_stream(stream, true) end threads.compact! threads.each do |t| begin t.join rescue Exception threads.compact.each{|t| t.raise(Aborted); t.join } raise $! end end end |
#copy_linked_files_dir ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/scout/workflow/step/file.rb', line 32 def copy_linked_files_dir if File.symlink?(self.files_dir) begin realpath = Open.realpath(self.files_dir) Open.rm self.files_dir Open.cp realpath, self.files_dir rescue Log.warn "Copy files_dir for #{self.workflow_short_path} failed: " + $!. end end end |
#digest_str ⇒ Object
366 367 368 |
# File 'lib/scout/workflow/step.rb', line 366 def digest_str "Step: " + short_path end |
#dirty? ⇒ Boolean
74 75 76 |
# File 'lib/scout/workflow/step/status.rb', line 74 def dirty? done? && ! updated? end |
#done? ⇒ Boolean
249 250 251 |
# File 'lib/scout/workflow/step.rb', line 249 def done? Open.exist?(path) end |
#error? ⇒ Boolean
150 151 152 |
# File 'lib/scout/workflow/step/info.rb', line 150 def error? status == :error || status == 'error' end |
#exception ⇒ Object
171 172 173 |
# File 'lib/scout/workflow/step/info.rb', line 171 def exception info[:exception] end |
#exec ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/scout/workflow/step.rb', line 102 def exec if inputs if Task === task types = task.inputs.collect{|name,type| type } new_inputs = inputs.zip(types).collect{|input,info| type, desc, default, = info next input unless Step === input input.join if input.streaming? Task.format_input(input.join.path, type, ) } else new_inputs = inputs.collect{|input| Step === input ? input.load : input } end inputs = new_inputs end @result = begin @in_exec = true @exec_context.instance_exec(*inputs, &task) ensure @in_exec = false end end |
#file(file = nil) ⇒ Object
15 16 17 18 19 20 |
# File 'lib/scout/workflow/step/file.rb', line 15 def file(file = nil) dir = files_dir Path.setup(dir) unless Path === dir return dir if file.nil? dir[file] end |
#files ⇒ Object
22 23 24 25 26 |
# File 'lib/scout/workflow/step/file.rb', line 22 def files Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path }.collect do |path| Misc.path_relative_to(files_dir, path) end end |
#files_dir ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 |
# File 'lib/scout/workflow/step/file.rb', line 2 def files_dir @files_dir ||= begin dir = @path + ".files" if Path === @path @path.annotate(dir) else Path.setup(dir) end dir.pkgdir = self dir end end |
#fingerprint ⇒ Object
370 371 372 |
# File 'lib/scout/workflow/step.rb', line 370 def fingerprint digest_str end |
#fork(noload = false, semaphore = nil) ⇒ Object
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 |
# File 'lib/scout/workflow/step.rb', line 229 def fork(noload = false, semaphore = nil) pid = Process.fork do Signal.trap(:TERM) do raise Aborted, "Recieved TERM Signal on forked process #{Process.pid}" end reset_info status: :queue, pid: Process.pid unless present? if semaphore ScoutSemaphore.synchronize(semaphore) do run(noload) end else run(noload) end join end Process.detach pid grace self end |
#full_task_name ⇒ Object
96 97 98 99 100 |
# File 'lib/scout/workflow/step.rb', line 96 def full_task_name return nil if task_name.nil? return task_name.to_s if workflow.nil? [workflow, task_name] * "#" end |
#grace ⇒ Object
308 309 310 311 312 313 |
# File 'lib/scout/workflow/step.rb', line 308 def grace while ! present? sleep 0.1 end self end |
#info ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/scout/workflow/step/info.rb', line 39 def info outdated = begin @info_load_time && (mtime = Open.mtime(info_file)) && mtime > @info_load_time rescue true end if @info.nil? || outdated load_info end @info end |
#info_file ⇒ Object
3 4 5 6 7 8 9 10 |
# File 'lib/scout/workflow/step/info.rb', line 3 def info_file return nil if @path.nil? @info_file ||= begin info_file = @path + ".info" @path.annotate info_file if Path === @path info_file end end |
#init_info ⇒ Object
35 36 37 |
# File 'lib/scout/workflow/step/info.rb', line 35 def init_info log :waiting unless info_file.nil? || Open.exists?(info_file) end |
#input_dependencies ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/scout/workflow/step/dependencies.rb', line 21 def input_dependencies return [] unless inputs inputs.collect do |d| if Step === d d elsif (Path === d) && (Step === d.pkgdir) d.pkgdir else nil end end.compact.uniq end |
#join ⇒ Object
319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/scout/workflow/step.rb', line 319 def join consume_all_streams while @result.nil? && (present? && ! (terminated? || done?)) sleep 0.1 end Misc.wait_child info[:pid] if info[:pid] raise self.exception if self.exception raise "Error in job #{self.path}" if self.error? or self.aborted? self end |
#load ⇒ Object
345 346 347 348 349 |
# File 'lib/scout/workflow/step.rb', line 345 def load return @result unless @result.nil? || streaming? join done? ? Persist.load(path, type) : exec end |
#load_info ⇒ Object
21 22 23 24 |
# File 'lib/scout/workflow/step/info.rb', line 21 def load_info @info = Step.load_info(info_file) @info_load_time = Time.now end |
#log(status, message = nil, &block) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/scout/workflow/step/info.rb', line 128 def log(status, = nil, &block) if block_given? time = Misc.exec_time &block time_str = Misc.format_seconds_short time = .nil? ? Log.color(:time, time_str) : "#{Log.color :time, time_str} - #{ }" end if merge_info :status => status, :message => else merge_info :status => status end end |
#marshal_load(path) ⇒ Object
184 185 186 |
# File 'lib/scout/workflow/step/info.rb', line 184 def marshal_load(path) Step.new path end |
#merge_info(new_info) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/scout/workflow/step/info.rb', line 53 def merge_info(new_info) info = self.info new_info.each do |key,value| value = Annotation.purge(value) if key == :status = new_info[:message] if .nil? && (value == :done || value == :error || value == :aborted) issued = info[:issued] start = info[:start] eend = new_info[:end] if start && eend time = eend - start total_time = eend - issued if total_time - time > 1 time_str = "#{Misc.format_seconds_short(time)} (#{Misc.format_seconds_short(total_time)})" else time_str = Misc.format_seconds_short(time) end info[:time_elapsed] = time info[:total_time_elapsed] = total_time = Log.color(:time, time_str) end end report_status value, end if key == :message = info[:messages] || [] << value info[:messages] = next end if Exception === value begin Marshal.dump(value) rescue TypeError if ScoutException === value new = ScoutException.new value. else new = Exception.new value. end new.set_backtrace(value.backtrace) value = new end end if info.include?(key) case info[key] when Array info[key].concat Array === value ? value : [value] when Hash info[key].merge! value else info[key] = value end else info[key] = value end end save_info(info) end |
#messages ⇒ Object
142 143 144 |
# File 'lib/scout/workflow/step/info.rb', line 142 def info[:messages] end |
#name ⇒ Object
67 68 69 |
# File 'lib/scout/workflow/step.rb', line 67 def name @name ||= File.basename(@path) end |
#overriden? ⇒ Boolean
162 163 164 165 |
# File 'lib/scout/workflow/step/info.rb', line 162 def overriden? @overriden = overriden_task || overriden_workflow || overriden_deps.any? if @overriden.nil? @overriden end |
#overriden_deps ⇒ Object
167 168 169 |
# File 'lib/scout/workflow/step/info.rb', line 167 def overriden_deps rec_dependencies.select{|d| d.overriden? } end |
#prepare_dependencies ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/scout/workflow/step/dependencies.rb', line 34 def prepare_dependencies inverse_dep = {} dependencies.each do |dep| if dep.present? && ! dep.updated? Log.medium "Clean outdated #{dep.path}" dep.clean end next if dep.done? if dep.dependencies dep.dependencies.each do |d| inverse_dep[d] ||= [] inverse_dep[d] << dep end end input_dependencies.each do |d| inverse_dep[d] ||= [] inverse_dep[d] << dep end end if dependencies inverse_dep.each do |dep,list| dep.tee_copies = list.length end end |
#present? ⇒ Boolean
302 303 304 305 306 |
# File 'lib/scout/workflow/step.rb', line 302 def present? Open.exist?(path) || Open.exist?(info_file) || Open.exist?(files_dir) end |
#produce(with_fork: false) ⇒ Object
334 335 336 337 338 339 340 341 342 343 |
# File 'lib/scout/workflow/step.rb', line 334 def produce(with_fork: false) clean if error? && recoverable_error? if with_fork self.fork self.join else run(:no_load) end self end |
#progress_bar(msg = "Progress", options = nil, &block) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/scout/workflow/step/progress.rb', line 2 def (msg = "Progress", = nil, &block) if Hash === msg and .nil? = msg msg = nil end = {} if .nil? max = [:max] Open.mkdir files_dir = Log::ProgressBar.(max, {:desc => msg, :file => (@exec ? nil : file(:progress))}.merge()) if block_given? .init res = yield .remove res else end end |
#rec_dependencies(connected = false, seen = []) ⇒ Object
2 3 4 5 6 7 8 9 10 11 |
# File 'lib/scout/workflow/step/dependencies.rb', line 2 def rec_dependencies(connected = false, seen = []) direct_deps = [] dependencies.each do |dep| next if seen.include? dep.path next if connected && dep.done? && dep.updated? direct_deps << dep end if dependencies seen.concat direct_deps.collect{|d| d.path } direct_deps.inject(direct_deps){|acc,d| acc.concat(d.rec_dependencies(connected, seen)); acc } end |
#recoverable_error? ⇒ Boolean
19 20 21 |
# File 'lib/scout/workflow/step/status.rb', line 19 def recoverable_error? self.error? && ! (ScoutException === self.exception) end |
#recursive_clean ⇒ Object
51 52 53 54 55 56 |
# File 'lib/scout/workflow/step/status.rb', line 51 def recursive_clean dependencies.each do |dep| dep.recursive_clean end clean end |
#recursive_inputs ⇒ Object
13 14 15 16 17 18 19 |
# File 'lib/scout/workflow/step/dependencies.rb', line 13 def recursive_inputs recursive_inputs = NamedArray === inputs ? inputs.to_hash : {} return recursive_inputs if dependencies.nil? dependencies.inject(recursive_inputs) do |acc,dep| acc.merge(dep.recursive_inputs) end end |
#report_status(status, message = nil) ⇒ Object
120 121 122 123 124 125 126 |
# File 'lib/scout/workflow/step/info.rb', line 120 def report_status(status, = nil) if .nil? Log.info [Log.color(:status, status, true), Log.color(:task, task_name, true), Log.color(:path, path)] * " " else Log.info [Log.color(:status, status, true), Log.color(:task, task_name, true), , Log.color(:path, path)] * " " end end |
#reset_info(info = {}) ⇒ Object
31 32 33 |
# File 'lib/scout/workflow/step/info.rb', line 31 def reset_info(info = {}) save_info(@info = info) end |
#run(stream = false) ⇒ Object
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
# File 'lib/scout/workflow/step.rb', line 140 def run(stream = false) return @result || self.load if done? prepare_dependencies begin case stream when TrueClass, :stream no_load = :stream when :no_load no_load = true else no_load = false end @result = Persist.persist(name, type, :path => path, :tee_copies => tee_copies, no_load: no_load) do input_names = (task.respond_to?(:inputs) && task.inputs) ? task.inputs.collect{|name,_| name} : [] reset_info :status => :setup, :issued => Time.now, :pid => Process.pid, :pid_hostname => Misc.hostname, :task_name => task_name, :workflow => workflow.to_s, :inputs => Annotation.purge(inputs), :input_names => input_names, :type => type, :dependencies => (dependencies || []) .collect{|d| d.path } run_dependencies set_info :start, Time.now log :start @exec_result = exec if @exec_result.nil? && File.exist?(self.tmp_path) && ! File.exist?(self.path) Open.mv self.tmp_path, self.path else @exec_result = @exec_result.stream if @exec_result.respond_to?(:stream) && ! (TSV === @exec_result) end @exec_result if (IO === @exec_result || StringIO === @exec_result) && (ENV["SCOUT_NO_STREAM"] == "true" || ! stream) Open.sensible_write(self.path, @exec_result) @exec_result = nil else @exec_result end end if TrueClass === no_load consume_all_streams if streaming? @result = nil elsif no_load && ! (IO === @result) @result = nil end @result rescue Exception => e merge_info :status => :error, :exception => e, :end => Time.now, :backtrace => e.backtrace, :message => "#{e.class}: #{e.}" begin abort_dependencies ensure raise e end ensure if ! (error? || aborted?) if @result && streaming? ConcurrentStream.setup(@result) do merge_info :status => :done, :end => Time.now end @result.abort_callback = proc do |exception| if exception.nil? || Aborted === exception || Interrupt === exception merge_info :status => :aborted, :end => Time.now else begin merge_info :status => :error, :exception => exception, :end => Time.now rescue Exception Log.exception $! end end end log :streaming else merge_info :status => :done, :end => Time.now end end end end |
#run_dependencies ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/scout/workflow/step/dependencies.rb', line 72 def run_dependencies all_dependencies.each do |dep| next if dep.running? || dep.done? = compute[dep.path] if compute = [] if .nil? stream = .include?(:stream) stream = true unless ENV["SCOUT_EXPLICIT_STREAMING"] == 'true' stream = :no_load if .include?(:produce) begin dep.run(stream) rescue ScoutException if .include?(:canfail) Log.medium "Allow failing of #{dep.path}" else raise $! end end end end |
#running? ⇒ Boolean
158 159 160 |
# File 'lib/scout/workflow/step/info.rb', line 158 def running? ! (done? && status == :done) && (info[:pid] && Misc.pid_alive?(info[:pid])) end |
#save_info(info = nil) ⇒ Object
26 27 28 29 |
# File 'lib/scout/workflow/step/info.rb', line 26 def save_info(info = nil) Persist.save(info, info_file, SERIALIZER) @info_load_time = Time.now end |
#save_inputs(inputs_dir) ⇒ Object
2 3 4 |
# File 'lib/scout/workflow/step/inputs.rb', line 2 def save_inputs(inputs_dir) self.task.save_inputs(inputs_dir, provided_inputs) end |
#set_info(key, value) ⇒ Object
116 117 118 |
# File 'lib/scout/workflow/step/info.rb', line 116 def set_info(key, value) merge_info(key => value) end |
#short_path ⇒ Object
71 72 73 |
# File 'lib/scout/workflow/step.rb', line 71 def short_path [workflow.to_s, task_name, name] * "/" end |
#started? ⇒ Boolean
62 63 64 65 66 67 68 |
# File 'lib/scout/workflow/step/status.rb', line 62 def started? return true if done? return false unless Open.exist?(info_file) pid = info[:pid] return false unless pid return Misc.pid_alive?(pid) end |
#status ⇒ Object
146 147 148 |
# File 'lib/scout/workflow/step/info.rb', line 146 def status info[:status].tap{|s| s.nil? ? s : s.to_sym } end |
#step(task_name) ⇒ Object
351 352 353 354 355 356 357 358 359 360 |
# File 'lib/scout/workflow/step.rb', line 351 def step(task_name) task_name = task_name.to_sym dependencies.each do |dep| return dep if dep.task_name && dep.task_name.to_sym == task_name return dep if dep.overriden_task && dep.overriden_task.to_sym == task_name rec_dep = dep.step(task_name) return rec_dep if rec_dep end nil end |
#stream ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/scout/workflow/step.rb', line 257 def stream synchronize do if streaming? && ! @result.nil? if @result.next Log.debug "Taking result #{Log.fingerprint @result} next #{Log.fingerprint @result.next}" else Log.debug "Taking result #{Log.fingerprint @result}" end @take_stream, @result = @result, @result.next return @take_stream end end if done? Open.open(self.path) else if running? || waiting? join Open.open(self.path) else exec end end end |
#streaming? ⇒ Boolean
253 254 255 |
# File 'lib/scout/workflow/step.rb', line 253 def streaming? @take_stream || IO === @result || StringIO === @result end |
#synchronize(&block) ⇒ Object
33 34 35 |
# File 'lib/scout/workflow/step.rb', line 33 def synchronize(&block) @mutex.synchronize(&block) end |
#task_name ⇒ Object
84 85 86 87 88 |
# File 'lib/scout/workflow/step.rb', line 84 def task_name @task_name ||= @task.name if @task.respond_to?(:name) @task_name ||= info[:task_name] if Open.exist?(info_file) @task_name ||= path.split("/")[-2] end |
#task_signature ⇒ Object
374 375 376 377 |
# File 'lib/scout/workflow/step.rb', line 374 def task_signature workflow_name = String === workflow ? workflow : workflow.name [workflow, task_name] * "#" end |
#terminated? ⇒ Boolean
315 316 317 |
# File 'lib/scout/workflow/step.rb', line 315 def terminated? ! @in_exec && (done? || error? || aborted?) end |
#tmp_path ⇒ Object
129 130 131 132 133 134 135 136 137 |
# File 'lib/scout/workflow/step.rb', line 129 def tmp_path @tmp_path ||= begin basename = File.basename(@path) dirname = File.dirname(@path) tmp_path = File.join(dirname, '.' + basename) @path.setup(tmp_path) if Path === @path tmp_path end end |
#updated? ⇒ Boolean
23 24 25 26 27 28 29 30 31 |
# File 'lib/scout/workflow/step/status.rb', line 23 def updated? return false if self.error? && self.recoverable_error? return true if self.done? && ! ENV["SCOUT_UPDATE"] newer = rec_dependencies.select{|dep| Path.newer?(self.path, dep.path) } newer += input_dependencies.select{|dep| Path.newer?(self.path, dep.path) } Log.low "Newer deps found for #{Log.fingerprint self}: #{Log.fingerprint newer}" if newer.any? newer.empty? end |
#waiting? ⇒ Boolean
70 71 72 |
# File 'lib/scout/workflow/step/status.rb', line 70 def waiting? present? and not started? end |