Module: HPC::Orchestration
- Defined in:
- lib/rbbt/hpc/orchestrate.rb,
lib/rbbt/hpc/orchestrate.old.rb,
lib/rbbt/hpc/orchestrate/rules.rb,
lib/rbbt/hpc/orchestrate/chains.rb,
lib/rbbt/hpc/orchestrate/batches.rb
Class Method Summary collapse
- .accumulate_rules(current, new) ⇒ Object
- .add_batch_deps(batches) ⇒ Object
- .add_config_keys(current, new) ⇒ Object
- .add_rules_and_consolidate(rules, batches) ⇒ Object
- .chain_batches(rules, chains, workload) ⇒ Object
- .check_chains(chains, job) ⇒ Object
- .job_batches(rules, job) ⇒ Object
- .job_chains(rules, job, computed = {}) ⇒ Object
- .job_dependencies(job) ⇒ Object
- .job_rules(rules, job) ⇒ Object
- .job_workload(job) ⇒ Object
- .merge_rules(current, new) ⇒ Object
- .orchestration_rules(orchestration_rules_file = nil) ⇒ Object
- .parse_chains(rules) ⇒ Object
- .pb(batch) ⇒ Object
- .task_specific_rules(rules, workflow, task) ⇒ Object
- .workflow_rules(rules, workflow) ⇒ Object
Instance Method Summary collapse
- #get_chains(job, rules, chains = {}) ⇒ Object
- #get_job_dependencies(job, job_rules = nil) ⇒ Object
- #get_recursive_job_dependencies(job) ⇒ Object
- #job_rules(rules, job) ⇒ Object
- #orchestrate_job(job, options) ⇒ Object
- #piggyback(job, job_rules, job_deps) ⇒ Object
- #prepare_for_execution(job) ⇒ Object
- #workload(job, rules, chains, options, seen = nil) ⇒ Object
Class Method Details
.accumulate_rules(current, new) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 34 def self.accumulate_rules(current, new) return IndiferentHash.setup({}) if (new.nil? || new.empty?) && (current.nil? || current.empty?) return IndiferentHash.setup(current.dup) if new.nil? || new.empty? return IndiferentHash.setup(new.dup) if current.nil? || current.empty? target = IndiferentHash.setup(current.dup) new.each do |k,value| case k.to_s when "config_keys" target[k] = add_config_keys target["config_keys"], value when "cpus" target[k] = [target[k], value].compact.sort_by{|v| v.to_i}.last when "time" target[k] = Misc.format_seconds [target[k], value].compact.inject(0){|acc,t| acc += Misc.timespan t } when "skip" skip = target[k] && value target.delete k unless skip else next if target.include?(k) target[k] = value end end target end |
.add_batch_deps(batches) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 78 def self.add_batch_deps(batches) batches.each do |batch| jobs = batch[:jobs] all_deps = jobs.collect{|d| job_dependencies(d) }.flatten.uniq - jobs minimum = all_deps all_deps.each do |dep| minimum -= job_dependencies(dep) end all_deps = minimum deps = all_deps.collect do |d| (batches - [batch]).select{|batch| batch[:jobs].collect(&:path).include? d.path } end.flatten.uniq batch[:deps] = deps end batches end |
.add_config_keys(current, new) ⇒ Object
3 4 5 6 7 8 9 |
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 3 def self.add_config_keys(current, new) if current.nil? new else new + ',' + current end.gsub(/,\s*/,',').split(",").reverse.uniq.reverse * "," end |
.add_rules_and_consolidate(rules, batches) ⇒ Object
99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 99 def self.add_rules_and_consolidate(rules, batches) chain_rules = parse_chains(rules) batches.each do |batch| job_rules = batch[:jobs].inject(nil) do |acc,job| workflow = job.original_workflow || job.workflow task_name = job.original_task_name || job.task_name task_rules = task_specific_rules(rules, workflow, task_name) acc = accumulate_rules(acc, task_rules.dup) end if chain = batch[:chain] batch[:rules] = merge_rules(chain_rules[chain][:rules].dup, job_rules) else batch[:rules] = job_rules end end begin batches.each do |batch| batch[:deps] = batch[:deps].collect do |dep| dep[:target] || dep end if batch[:deps] end batches.each do |batch| next if batch[:top_level].overriden? next unless batch[:rules][:skip] batch[:rules].delete :skip next if batch[:deps].nil? if batch[:deps].any? batch_dep_jobs = batch[:top_level].rec_dependencies target = batch[:deps].select do |target| batch_dep_jobs.include?(target[:top_level]) && # Don't piggyback batches that are an input dependency, only real dependencies (batch[:deps] - [target] - target[:deps]).empty? end.first next if target.nil? target[:jobs] = batch[:jobs] + target[:jobs] target[:deps] = (target[:deps] + batch[:deps]).uniq - [target] target[:top_level] = batch[:top_level] target[:rules] = accumulate_rules(target[:rules], batch[:rules]) batch[:target] = target end raise TryAgain end rescue TryAgain retry end batches.delete_if{|b| b[:target] } batches end |
.chain_batches(rules, chains, workload) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 45 def self.chain_batches(rules, chains, workload) chain_rules = parse_chains(rules) batches = [] while job = workload.pop matches = chains.select{|name,info| info[:jobs].include? job } if matches.any? name, info = matches.sort_by do |name,info| num_jobs = info[:jobs].length total_tasks = chain_rules[name][:tasks].values.flatten.uniq.length num_jobs.to_f + 1/total_tasks end.last workload = workload - info[:jobs] info[:chain] = name batch = info else batch = {:jobs => [job], :top_level => job} end chains.delete_if{|name,info| batch[:jobs].include? info[:top_level] } chains.each do |name,info| info[:jobs] = info[:jobs] - batch[:jobs] end chains.delete_if{|name,info| info[:jobs].length < 2 } batches << batch end batches end |
.check_chains(chains, job) ⇒ Object
3 4 5 6 7 8 9 10 11 12 13 14 |
# File 'lib/rbbt/hpc/orchestrate/chains.rb', line 3 def self.check_chains(chains, job) return [] if Symbol === job.overriden matches = [] chains.each do |name, chain| workflow = job.original_workflow || job.workflow task_name = job.original_task_name || job.task_name next unless chain[:tasks].include?(workflow.to_s) next unless chain[:tasks][workflow.to_s].include?(task_name.to_s) matches << name end matches end |
.job_batches(rules, job) ⇒ Object
154 155 156 157 158 159 160 161 162 |
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 154 def self.job_batches(rules, job) job_chains = self.job_chains(rules, job).dup workload = job_workload(job).uniq batches = chain_batches(rules, job_chains, workload) batches = add_batch_deps(batches) batches = add_rules_and_consolidate(rules, batches) end |
.job_chains(rules, job, computed = {}) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/rbbt/hpc/orchestrate/chains.rb', line 42 def self.job_chains(rules, job, computed = {}) computed[Misc.fingerprint([rules, job.path, job.object_id])] ||= begin chains = self.parse_chains(rules) matches = check_chains(chains, job) dependencies = job_dependencies(job) job_chains = [] new_job_chains = {} dependencies.each do |dep| dep_matches = check_chains(chains, dep) common = matches & dep_matches dep_chains = job_chains(rules, dep, computed) found = [] dep_chains.each do |match,info| if common.include?(match) found << match new_info = new_job_chains[match] ||= {} new_info[:jobs] ||= [] new_info[:jobs].concat info[:jobs] new_info[:top_level] = job else job_chains << [match, info] end end (common - found).each do |match| info = {} info[:jobs] = [job, dep] info[:top_level] = job job_chains << [match, info] end end new_job_chains.each do |match,info| info[:jobs].prepend job job_chains << [match, info] end job_chains end end |
.job_dependencies(job) ⇒ Object
38 39 40 |
# File 'lib/rbbt/hpc/orchestrate/chains.rb', line 38 def self.job_dependencies(job) (job.dependencies + job.input_dependencies).uniq.select{|d| ! d.done? || d.dirty? } end |
.job_rules(rules, job) ⇒ Object
68 69 70 71 72 73 74 75 |
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 68 def self.job_rules(rules, job) return {} if job.done? or job.error? job_rules = task_specific_rules(rules, job.workflow.to_s, job.task_name.to_s) job.dependencies.each do |dep| job_rules = accumulate_rules(job_rules, job_rules(rules, dep)) end job_rules end |
.job_workload(job) ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 19 def self.job_workload(job) workload = [] path_jobs = {} path_jobs[job.path] = job heap = [] heap << job.path while job_path = heap.pop job = path_jobs[job_path] next if job.done? workload << job deps = job_dependencies(job) deps.each do |d| path_jobs[d.path] ||= d end heap.concat deps.collect(&:path) heap.uniq! end workload.uniq end |
.merge_rules(current, new) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 17 def self.merge_rules(current, new) return IndiferentHash.setup({}) if (new.nil? || new.empty?) && (current.nil? || current.empty?) return IndiferentHash.setup(current.dup) if new.nil? || new.empty? return IndiferentHash.setup(new.dup) if current.nil? || current.empty? target = IndiferentHash.setup(current.dup) new.each do |k,value| case k.to_s when "config_keys" target[k] = add_config_keys target["config_keys"], value else next if target.include?(k) target[k] = value end end target end |
.orchestration_rules(orchestration_rules_file = nil) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/rbbt/hpc/orchestrate.rb', line 25 def self.orchestration_rules(orchestration_rules_file = nil) rules = {} if orchestration_rules_file if Open.exists?(orchestration_rules_file) rules = Misc.load_yaml(orchestration_rules_file) elsif Rbbt.etc.batch[orchestration_rules_file].exists? rules = Misc.load_yaml(Rbbt.etc.batch[orchestration_rules_file]) elsif Rbbt.etc.batch[orchestration_rules_file + '.yaml'].exists? rules = Misc.load_yaml(Rbbt.etc.batch[orchestration_rules_file + '.yaml']) else raise "Orchestration rules file not found: #{orchestration_rules_file}" end elsif Rbbt.etc.batch["default.yaml"].exists? rules = Misc.load_yaml(Rbbt.etc.batch["default.yaml"]) end IndiferentHash.setup(rules) end |
.parse_chains(rules) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/rbbt/hpc/orchestrate/chains.rb', line 16 def self.parse_chains(rules) return {} if rules["chains"].nil? chains = IndiferentHash.setup({}) rules["chains"].each do |name,rules| rules = IndiferentHash.setup(rules.dup) chain_tasks = rules.delete(:tasks).split(/,\s*/) workflow = rules.delete(:workflow) chain_tasks.each do |task| chain_workflow, chain_task = task.split("#") chain_task, chain_workflow = chain_workflow, workflow if chain_task.nil? or chain_tasks.empty? chains[name] ||= IndiferentHash.setup({:tasks => {}, :rules => rules }) chains[name][:tasks][chain_workflow] ||= [] chains[name][:tasks][chain_workflow] << chain_task end end chains end |
.pb(batch) ⇒ Object
7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 7 def self.pb(batch) if Array === batch iii :BATCHES batch.each{|b| pb b} iii :END_BATCHES else n = batch.dup n[:deps] = n[:deps].collect{|b| b[:top_level] } if n[:deps] iif n end end |
.task_specific_rules(rules, workflow, task) ⇒ Object
58 59 60 61 62 63 64 65 66 |
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 58 def self.task_specific_rules(rules, workflow, task) defaults = rules[:defaults] || {} workflow = workflow.to_s task = task.to_s return defaults if rules[workflow].nil? workflow_rules = merge_rules(workflow_rules(rules, workflow), defaults) return IndiferentHash.setup(workflow_rules.dup) if rules[workflow][task].nil? merge_rules(rules[workflow][task], workflow_rules) end |
.workflow_rules(rules, workflow) ⇒ Object
11 12 13 14 15 |
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 11 def self.workflow_rules(rules, workflow) return {} if rules[workflow].nil? return {} if rules[workflow]["defaults"].nil? IndiferentHash.setup(rules[workflow]["defaults"]) end |
Instance Method Details
#get_chains(job, rules, chains = {}) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 74 def get_chains(job, rules, chains = {}) job_rules = self.job_rules(rules, job) job_deps = get_job_dependencies(job) input_deps = [] job.rec_dependencies.each do |dep| input_deps.concat dep.input_dependencies end job_deps.each do |dep| input_deps.concat dep.input_dependencies get_chains(dep, rules, chains) end job_deps.select do |dep| chained = job_rules["chain_tasks"] && job_rules["chain_tasks"][job.workflow.to_s] && job_rules["chain_tasks"][job.workflow.to_s].include?(job.task_name.to_s) && job_rules["chain_tasks"][dep.workflow.to_s] && job_rules["chain_tasks"][dep.workflow.to_s].include?(dep.task_name.to_s) dep_skip = dep.done? && ! input_deps.include?(dep) && self.job_rules(rules, dep)["skip"] chained || dep_skip end.each do |dep| chains[job] ||= [] chains[job] << dep chains[job].concat chains[dep] if chains[dep] chains[job].uniq! end chains end |
#get_job_dependencies(job, job_rules = nil) ⇒ Object
55 56 57 58 59 |
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 55 def get_job_dependencies(job, job_rules = nil) deps = job.dependencies || [] deps += job.input_dependencies || [] deps end |
#get_recursive_job_dependencies(job) ⇒ Object
61 62 63 64 |
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 61 def get_recursive_job_dependencies(job) deps = get_job_dependencies(job) (deps + deps.collect{|dep| get_recursive_job_dependencies(dep) }).flatten end |
#job_rules(rules, job) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 4 def job_rules(rules, job) workflow = job.workflow.to_s task_name = job.task_name.to_s task_name = job.overriden.to_s if Symbol === job.overriden defaults = rules["defaults"] || {} defaults = defaults.merge(rules[workflow]["defaults"] || {}) if rules[workflow] job_rules = IndiferentHash.setup(defaults.dup) rules["chains"].each do |name,info| IndiferentHash.setup(info) chain_tasks = info[:tasks].split(/,\s*/) chain_tasks.each do |task| task_workflow, chain_task = task.split("#") chain_task, task_workflow = task_workflow, info[:workflow] if chain_task.nil? or chain_tasks.empty? job_rules["chain_tasks"] ||= {} job_rules["chain_tasks"][task_workflow] ||= [] job_rules["chain_tasks"][task_workflow] << chain_task next unless task_name == chain_task.to_s && workflow == task_workflow.to_s config_keys = job_rules.delete :config_keys job_rules = IndiferentHash.setup(job_rules.merge(info)) if config_keys config_keys.gsub!(/,\s+/,',') job_rules[:config_keys] = job_rules[:config_keys] ? config_keys + "," + job_rules[:config_keys] : config_keys end end if job_rules["chain_tasks"][workflow] && job_rules["chain_tasks"][workflow].include?(task_name) break else job_rules.delete "chain_tasks" end end if rules["chains"] config_keys = job_rules.delete :config_keys job_rules = IndiferentHash.setup(job_rules.merge(rules[workflow][task_name])) if rules[workflow] && rules[workflow][task_name] if config_keys config_keys.gsub!(/,\s+/,',') job_rules[:config_keys] = job_rules[:config_keys] ? config_keys + "," + job_rules[:config_keys] : config_keys end if rules["skip"] && rules["skip"][workflow] job_rules["skip"] = true if rules["skip"][workflow].split(/,\s*/).include? task_name end job_rules end |
#orchestrate_job(job, options) ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/rbbt/hpc/orchestrate.rb', line 44 def orchestrate_job(job, ) .delete "recursive_clean" .delete "clean_task" .delete "clean" .delete "tail" .delete "printpath" .delete "detach" .delete "jobname" .delete "load_inputs" .delete "provenance" Log.high "Prepare for exec" prepare_for_execution(job) rules = HPC::Orchestration.orchestration_rules([:orchestration_rules]) batches = HPC::Orchestration.job_batches(rules, job) Log.high "Compute #{batches.length} batches" batch_ids = {} last_id = nil last_dir = nil while batches.any? top = batches.select{|b| b[:deps].nil? || (b[:deps] - batch_ids.keys).empty? }.first raise "No batch without unmet dependencies" if top.nil? batches.delete top = HPC::Orchestration.merge_rules(, top[:rules]) if top[:deps].nil? batch_dependencies = [] else top_jobs = top[:jobs] batch_dependencies = top[:deps].collect{|d| target = d[:top_level] canfail = false top_jobs.each do |job| canfail = true if job.canfail_paths.include?(target.path) end if canfail 'canfail:' + batch_ids[d].to_s else batch_ids[d].to_s end } end .merge!(:batch_dependencies => batch_dependencies ) .merge!(:manifest => top[:jobs].collect{|d| d.task_signature }) if [:dry_run] puts Log.color(:magenta, "Manifest: ") + Log.color(:blue, [:manifest] * ", ") + " - tasks: #{[:task_cpus] || 1} - time: #{[:time]} - config: #{[:config_keys]}" puts Log.color(:yellow, "Deps: ") + Log.color(:blue, [:batch_dependencies]*", ") puts Log.color(:yellow, "Path: ") + top[:top_level].path puts Log.color(:yellow, "Options: ") + .inspect batch_ids[top] = top[:top_level].task_signature else id, dir = run_job(top[:top_level], ) last_id = batch_ids[top] = id last_dir = dir end end [last_id, last_dir] end |
#piggyback(job, job_rules, job_deps) ⇒ Object
66 67 68 69 70 71 72 |
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 66 def piggyback(job, job_rules, job_deps) return false unless job_rules["skip"] final_deps = job_deps - job_deps.collect{|dep| get_recursive_job_dependencies(dep)}.flatten.uniq final_deps = final_deps.reject{|dep| dep.done? } return final_deps.first if final_deps.length == 1 return false end |
#prepare_for_execution(job) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rbbt/hpc/orchestrate.rb', line 8 def prepare_for_execution(job) rec_dependencies = job.rec_dependencies(true) return if rec_dependencies.empty? all_deps = rec_dependencies + [job] all_deps.each do |dep| begin Step.prepare_for_execution(dep) rescue RbbtException next end end end |
#workload(job, rules, chains, options, seen = nil) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 105 def workload(job, rules, chains, , seen = nil) return [] if job.done? if seen.nil? seen = {} target_job = true end job_rules = self.job_rules(rules, job) job_deps = get_job_dependencies(job) chain = chains[job] chain = chain.reject{|j| seen.include? j.path} if chain chain = chain.reject{|dep| dep.done? } if chain piggyback = piggyback(job, job_rules, job_deps) dep_ids = job_deps.collect do |dep| seen[dep.path] ||= nil if chain && chain.include?(dep) #&& ! job.input_dependencies.include?(dep) = IndiferentHash.setup(.dup) if piggyback and piggyback == dep [:piggyback] ||= [] [:piggyback].push job ids = workload(dep, rules, chains, , seen) else .delete :piggyback ids = workload(dep, rules, chains, , seen) end ids = [ids].flatten.compact.collect{|id| ['canfail', id] * ":"} if job.canfail_paths.include? dep.path seen[dep.path] = ids ids end.compact.flatten.uniq return seen[job.path] || dep_ids if seen.include?(job.path) if piggyback and seen[piggyback.path] return seen[job.path] = seen[piggyback.path] end job_rules.delete :chain_tasks job_rules.delete :tasks job_rules.delete :workflow option_config_keys = [:config_keys] = IndiferentHash.setup(.merge(job_rules).merge(:batch_dependencies => dep_ids)) .delete :orchestration_rules config_keys = job_rules.delete(:config_keys) if config_keys config_keys.gsub!(/,\s+/,',') [:config_keys] = [:config_keys] ? config_keys + "," + [:config_keys] : config_keys end if option_config_keys option_config_keys = option_config_keys.gsub(/,\s+/,',') [:config_keys] = [:config_keys] ? [:config_keys] + "," + option_config_keys : option_config_keys end if [:piggyback] manifest = [:piggyback].uniq manifest += [job] manifest.concat chain if chain job = [:piggyback].first job_rules = self.job_rules(rules, job) new_config_keys = self.job_rules(rules, job)[:config_keys] if new_config_keys new_config_keys = new_config_keys.gsub(/,\s+/,',') [:config_keys] = [:config_keys] ? [:config_keys] + "," + new_config_keys : new_config_keys end .delete :piggyback else manifest = [job] manifest.concat chain if chain end manifest.uniq! [:manifest] = manifest.collect{|j| j.task_signature } [:config_keys] = [:config_keys].split(",").uniq * "," if [:config_keys] if [:dry_run] puts Log.color(:magenta, "Manifest: ") + Log.color(:blue, [:manifest] * ", ") + " - tasks: #{[:task_cpus] || 1} - time: #{[:time]} - config: #{[:config_keys]}" puts Log.color(:yellow, "Deps: ") + Log.color(:blue, [:batch_dependencies]*", ") [:manifest].first else run_job(job, ) end end |