Module: HPC::Orchestration

Included in:
LSF, SLURM
Defined in:
lib/rbbt/hpc/orchestrate.rb,
lib/rbbt/hpc/orchestrate.old.rb,
lib/rbbt/hpc/orchestrate/rules.rb,
lib/rbbt/hpc/orchestrate/chains.rb,
lib/rbbt/hpc/orchestrate/batches.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.accumulate_rules(current, new) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 34

def self.accumulate_rules(current, new)
  return IndiferentHash.setup({}) if (new.nil? || new.empty?) && (current.nil? || current.empty?)
  return IndiferentHash.setup(current.dup) if new.nil? || new.empty?
  return IndiferentHash.setup(new.dup) if current.nil? || current.empty?
  target = IndiferentHash.setup(current.dup)
  new.each do |k,value|
    case k.to_s
    when "config_keys"
      target[k] = add_config_keys target["config_keys"], value
    when "cpus"
      target[k] = [target[k], value].compact.sort_by{|v| v.to_i}.last
    when "time"
      target[k] = Misc.format_seconds [target[k], value].compact.inject(0){|acc,t|  acc += Misc.timespan t }
    when "skip"
      skip = target[k] && value
      target.delete k unless skip
    else
      next if target.include?(k)
      target[k] = value
    end
  end
  target
end

.add_batch_deps(batches) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 78

def self.add_batch_deps(batches)

  batches.each do |batch|
    jobs = batch[:jobs]
    all_deps = jobs.collect{|d| job_dependencies(d) }.flatten.uniq - jobs

    minimum = all_deps
    all_deps.each do |dep|
      minimum -= job_dependencies(dep)
    end

    all_deps = minimum 
    deps = all_deps.collect do |d|
      (batches - [batch]).select{|batch| batch[:jobs].collect(&:path).include? d.path }
    end.flatten.uniq
    batch[:deps] = deps
  end

  batches
end

.add_config_keys(current, new) ⇒ Object



3
4
5
6
7
8
9
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 3

def self.add_config_keys(current, new)
  if current.nil?
    new
  else
    new + ',' + current
  end.gsub(/,\s*/,',').split(",").reverse.uniq.reverse * ","
end

.add_rules_and_consolidate(rules, batches) ⇒ Object



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 99

def self.add_rules_and_consolidate(rules, batches)
  chain_rules = parse_chains(rules)

  batches.each do |batch|
    job_rules = batch[:jobs].inject(nil) do |acc,job|
      workflow = job.original_workflow || job.workflow
      task_name = job.original_task_name || job.task_name
      task_rules = task_specific_rules(rules, workflow, task_name)
      acc = accumulate_rules(acc, task_rules.dup)
    end

    if chain = batch[:chain]
      batch[:rules] = merge_rules(chain_rules[chain][:rules].dup, job_rules)
    else
      batch[:rules] = job_rules
    end
  end

  begin
    batches.each do |batch|
      batch[:deps] = batch[:deps].collect do |dep|
        dep[:target] || dep
      end if batch[:deps]
    end

    batches.each do |batch|
      next if batch[:top_level].overriden?
      next unless batch[:rules][:skip]
      batch[:rules].delete :skip
      next if batch[:deps].nil?

      if batch[:deps].any?
        batch_dep_jobs = batch[:top_level].rec_dependencies
        target = batch[:deps].select do |target|
          batch_dep_jobs.include?(target[:top_level]) && # Don't piggyback batches that are an input dependency, only real dependencies
            (batch[:deps] - [target] - target[:deps]).empty?
        end.first
        next if target.nil?
        target[:jobs] = batch[:jobs] + target[:jobs]
        target[:deps] = (target[:deps] + batch[:deps]).uniq - [target]
        target[:top_level] = batch[:top_level]
        target[:rules] = accumulate_rules(target[:rules], batch[:rules])
        batch[:target] = target
      end
      raise TryAgain
    end
  rescue TryAgain
    retry
  end

  batches.delete_if{|b| b[:target] } 

  batches
end

.chain_batches(rules, chains, workload) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 45

def self.chain_batches(rules, chains, workload)
  chain_rules = parse_chains(rules)

  batches = []
  while job = workload.pop
    matches = chains.select{|name,info| info[:jobs].include? job }
    if matches.any?
      name, info = matches.sort_by do |name,info|
        num_jobs = info[:jobs].length
        total_tasks = chain_rules[name][:tasks].values.flatten.uniq.length
        num_jobs.to_f + 1/total_tasks
      end.last
      workload = workload - info[:jobs]
      info[:chain] = name
      batch = info
    else
      batch = {:jobs => [job], :top_level => job}
    end

    chains.delete_if{|name,info| batch[:jobs].include? info[:top_level] }

    chains.each do |name,info|
      info[:jobs] = info[:jobs] - batch[:jobs]
    end

    chains.delete_if{|name,info| info[:jobs].length < 2 }

    batches << batch
  end

  batches
end

.check_chains(chains, job) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
# File 'lib/rbbt/hpc/orchestrate/chains.rb', line 3

def self.check_chains(chains, job)
  return [] if Symbol === job.overriden
  matches = []
  chains.each do |name, chain|
    workflow = job.original_workflow || job.workflow
    task_name = job.original_task_name || job.task_name
    next unless chain[:tasks].include?(workflow.to_s)
    next unless chain[:tasks][workflow.to_s].include?(task_name.to_s)
    matches << name
  end
  matches
end

.job_batches(rules, job) ⇒ Object



154
155
156
157
158
159
160
161
162
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 154

def self.job_batches(rules, job)
  job_chains = self.job_chains(rules, job)

  workload = job_workload(job).uniq

  batches = chain_batches(rules, job_chains, workload)
  batches = add_batch_deps(batches)
  batches = add_rules_and_consolidate(rules, batches)
end

.job_chains(rules, job) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/rbbt/hpc/orchestrate/chains.rb', line 42

def self.job_chains(rules, job)
  @@job_chains ||= {}
  @@job_chains[Misc.digest([rules, job.path].inspect)] ||= 
    begin
      chains = self.parse_chains(rules)

      matches = check_chains(chains, job)

      dependencies = job_dependencies(job)

      job_chains = []
      new_job_chains = {}
      dependencies.each do |dep|
        dep_matches = check_chains(chains, dep)
        common = matches & dep_matches

        dep_chains = job_chains(rules, dep)
        found = []
        dep_chains.each do |match,info|
          if common.include?(match)
            found << match
            new_info = new_job_chains[match] ||= {}
            new_info[:jobs] ||= []
            new_info[:jobs].concat info[:jobs]
            new_info[:top_level] = job
          else
            job_chains << [match, info]
          end
        end

        (common - found).each do |match|
          info = {}
          info[:jobs] = [job, dep]
          info[:top_level] = job
          job_chains << [match, info]
        end
      end

      new_job_chains.each do |match,info|
        info[:jobs].prepend job
        job_chains << [match, info]
      end

      job_chains
    end
end

.job_dependencies(job) ⇒ Object



38
39
40
# File 'lib/rbbt/hpc/orchestrate/chains.rb', line 38

def self.job_dependencies(job)
  (job.dependencies + job.input_dependencies).uniq.select{|d| ! d.done? || d.dirty? }
end

.job_workload(job) ⇒ Object



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 19

def self.job_workload(job)
  workload = []
  path_jobs = {}

  path_jobs[job.path] = job

  heap = []
  heap << job.path
  while job_path = heap.pop
    job = path_jobs[job_path]
    next if job.done?
    workload << job

    deps =  job_dependencies(job)

    deps.each do  |d|
      path_jobs[d.path] ||= d
    end

    heap.concat deps.collect(&:path)
    heap.uniq!
  end
  workload.uniq
end

.merge_rules(current, new) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 17

def self.merge_rules(current, new)
  return IndiferentHash.setup({}) if (new.nil? || new.empty?) && (current.nil? || current.empty?)
  return IndiferentHash.setup(current.dup) if new.nil? || new.empty?
  return IndiferentHash.setup(new.dup) if current.nil? || current.empty?
  target = IndiferentHash.setup(current.dup)
  new.each do |k,value|
    case k.to_s
    when "config_keys"
      target[k] = add_config_keys target["config_keys"], value
    else
      next if target.include?(k)
      target[k] = value
    end
  end
  target
end

.parse_chains(rules) ⇒ Object



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/rbbt/hpc/orchestrate/chains.rb', line 16

def self.parse_chains(rules)
  return {} if rules["chains"].nil?

  chains = IndiferentHash.setup({})
  rules["chains"].each do |name,rules|
    rules  = IndiferentHash.setup(rules.dup)
    chain_tasks = rules.delete(:tasks).split(/,\s*/)
    workflow = rules.delete(:workflow)

    chain_tasks.each do |task|
      chain_workflow, chain_task = task.split("#")
      chain_task, chain_workflow = chain_workflow, workflow if chain_task.nil? or chain_tasks.empty?

      chains[name] ||= IndiferentHash.setup({:tasks => {}, :rules => rules })
      chains[name][:tasks][chain_workflow] ||= []
      chains[name][:tasks][chain_workflow] << chain_task
    end
  end

  chains
end

.pb(batch) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
# File 'lib/rbbt/hpc/orchestrate/batches.rb', line 7

def self.pb(batch)
  if Array === batch
    iii :BATCHES
    batch.each{|b| pb b}
    iii :END_BATCHES
  else
    n = batch.dup
    n[:deps] = n[:deps].collect{|b| b[:top_level] } if n[:deps]
    iif n
  end
end

.task_specific_rules(rules, workflow, task) ⇒ Object



58
59
60
61
62
63
64
65
66
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 58

def self.task_specific_rules(rules, workflow, task)
  defaults = rules[:defaults] || {}
  workflow = workflow.to_s
  task = task.to_s
  return defaults if rules[workflow].nil?
  workflow_rules = merge_rules(workflow_rules(rules, workflow), defaults)
  return IndiferentHash.setup(workflow_rules.dup) if rules[workflow][task].nil?
  merge_rules(rules[workflow][task], workflow_rules)
end

.workflow_rules(rules, workflow) ⇒ Object



11
12
13
14
15
# File 'lib/rbbt/hpc/orchestrate/rules.rb', line 11

def self.workflow_rules(rules, workflow)
  return {} if rules[workflow].nil?
  return {} if rules[workflow]["defaults"].nil? 
  IndiferentHash.setup(rules[workflow]["defaults"])
end

Instance Method Details

#get_chains(job, rules, chains = {}) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 74

def get_chains(job, rules, chains = {})
  job_rules = self.job_rules(rules, job)
  job_deps = get_job_dependencies(job)

  input_deps = []
  job.rec_dependencies.each do |dep|
    input_deps.concat dep.input_dependencies
  end

  job_deps.each do |dep|
    input_deps.concat dep.input_dependencies
    get_chains(dep, rules, chains)
  end

  job_deps.select do |dep|
    chained = job_rules["chain_tasks"] &&
      job_rules["chain_tasks"][job.workflow.to_s] && job_rules["chain_tasks"][job.workflow.to_s].include?(job.task_name.to_s)  &&
      job_rules["chain_tasks"][dep.workflow.to_s] && job_rules["chain_tasks"][dep.workflow.to_s].include?(dep.task_name.to_s) 

    dep_skip = dep.done? && ! input_deps.include?(dep) && self.job_rules(rules, dep)["skip"] 
    chained || dep_skip
  end.each do |dep|
    chains[job] ||= [] 
    chains[job] << dep 
    chains[job].concat chains[dep] if chains[dep]
    chains[job].uniq!
  end

  chains
end

#get_job_dependencies(job, job_rules = nil) ⇒ Object



55
56
57
58
59
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 55

def get_job_dependencies(job, job_rules = nil)
  deps = job.dependencies || []
  deps += job.input_dependencies || []
  deps
end

#get_recursive_job_dependencies(job) ⇒ Object



61
62
63
64
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 61

def get_recursive_job_dependencies(job)
  deps = get_job_dependencies(job) 
  (deps + deps.collect{|dep| get_recursive_job_dependencies(dep) }).flatten
end

#job_rules(rules, job) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 4

def job_rules(rules, job)
  workflow = job.workflow.to_s
  task_name = job.task_name.to_s
  task_name = job.overriden.to_s if Symbol === job.overriden

  defaults = rules["defaults"] || {}
  defaults = defaults.merge(rules[workflow]["defaults"] || {}) if rules[workflow]

  job_rules = IndiferentHash.setup(defaults.dup)

  rules["chains"].each do |name,info|
    IndiferentHash.setup(info)
    chain_tasks = info[:tasks].split(/,\s*/)

    chain_tasks.each do |task|
      task_workflow, chain_task = task.split("#")
      chain_task, task_workflow = task_workflow, info[:workflow] if chain_task.nil? or chain_tasks.empty?
      job_rules["chain_tasks"] ||= {}
      job_rules["chain_tasks"][task_workflow] ||= []
      job_rules["chain_tasks"][task_workflow]  << chain_task
      next unless task_name == chain_task.to_s && workflow == task_workflow.to_s
      config_keys = job_rules.delete :config_keys
      job_rules = IndiferentHash.setup(job_rules.merge(info)) 
      if config_keys
        config_keys.gsub!(/,\s+/,',') 
        job_rules[:config_keys] = job_rules[:config_keys] ? config_keys + "," + job_rules[:config_keys] : config_keys
      end
    end

    if job_rules["chain_tasks"][workflow] && job_rules["chain_tasks"][workflow].include?(task_name)
      break
    else
      job_rules.delete "chain_tasks" 
    end
  end if rules["chains"]

  config_keys = job_rules.delete :config_keys
  job_rules = IndiferentHash.setup(job_rules.merge(rules[workflow][task_name])) if rules[workflow] && rules[workflow][task_name]

  if config_keys
    config_keys.gsub!(/,\s+/,',') 
    job_rules[:config_keys] = job_rules[:config_keys] ? config_keys + "," + job_rules[:config_keys] : config_keys
  end

  if rules["skip"] && rules["skip"][workflow]
    job_rules["skip"] = true if rules["skip"][workflow].split(/,\s*/).include? task_name
  end

  job_rules
end

#orchestrate_job(job, options) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/rbbt/hpc/orchestrate.rb', line 25

def orchestrate_job(job, options)
  options.delete "recursive_clean"
  options.delete "clean_task"
  options.delete "clean"
  options.delete "tail"
  options.delete "printpath"
  options.delete "detach"
  options.delete "jobname"
  options.delete "load_inputs"
  options.delete "provenance"

  Log.high "Prepare for exec"
  prepare_for_execution(job)

  if options[:orchestration_rules]
    rules = YAML.load(Open.read(options[:orchestration_rules]))
  elsif Rbbt.etc.slurm["default.yaml"].exists?
    rules = YAML.load(Open.read(Rbbt.etc.slurm["default.yaml"]))
  else
    rules = {}
  end

  IndiferentHash.setup(rules)

  batches = HPC::Orchestration.job_batches(rules, job)
  Log.high "Compute #{batches.length} batches"

  batch_ids = {}
  last_id = nil
  last_dir = nil
  while batches.any?
    top = batches.select{|b| b[:deps].nil? || (b[:deps] - batch_ids.keys).empty? }.first
    raise "No batch without unmet dependencies" if top.nil?
    batches.delete top

    job_options = HPC::Orchestration.merge_rules(options, top[:rules])

    if top[:deps].nil?
      batch_dependencies = [] 
    else 
      top_jobs = top[:jobs]

      batch_dependencies = top[:deps].collect{|d| 
        target = d[:top_level]
        canfail = false

        top_jobs.each do |job|
          canfail = true if job.canfail_paths.include?(target.path)
        end

        if canfail
          'canfail:' + batch_ids[d].to_s
        else
          batch_ids[d].to_s
        end
      }
    end

    job_options.merge!(:batch_dependencies => batch_dependencies )
    job_options.merge!(:manifest => top[:jobs].collect{|d| d.task_signature })

    if options[:dry_run]
      puts Log.color(:magenta, "Manifest: ") + Log.color(:blue, job_options[:manifest] * ", ") + " - tasks: #{job_options[:task_cpus] || 1} - time: #{job_options[:time]} - config: #{job_options[:config_keys]}"
      puts Log.color(:yellow, "Deps: ") + Log.color(:blue, job_options[:batch_dependencies]*", ")
      puts Log.color(:yellow, "Path: ") + top[:top_level].path
      puts Log.color(:yellow, "Options: ") + job_options.inspect
      batch_ids[top] = top[:top_level].task_signature
    else
      id, dir = run_job(top[:top_level], job_options)
      last_id = batch_ids[top] = id
      last_dir = dir
    end
  end
  [last_id, last_dir]
end

#piggyback(job, job_rules, job_deps) ⇒ Object



66
67
68
69
70
71
72
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 66

def piggyback(job, job_rules, job_deps)
  return false unless job_rules["skip"]
  final_deps = job_deps - job_deps.collect{|dep| get_recursive_job_dependencies(dep)}.flatten.uniq
  final_deps = final_deps.reject{|dep| dep.done? }
  return final_deps.first if final_deps.length == 1
  return false
end

#prepare_for_execution(job) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/rbbt/hpc/orchestrate.rb', line 8

def prepare_for_execution(job)
  rec_dependencies = job.rec_dependencies(true)

  return if rec_dependencies.empty?

  all_deps = rec_dependencies + [job]

  all_deps.each do |dep|
    begin
      Step.prepare_for_execution(dep)
    rescue RbbtException
      next
    end
  end

end

#workload(job, rules, chains, options, seen = nil) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/rbbt/hpc/orchestrate.old.rb', line 105

def workload(job, rules, chains, options, seen = nil)
  return [] if job.done?
  if seen.nil?
    seen = {}
    target_job = true
  end

  job_rules = self.job_rules(rules, job)
  job_deps = get_job_dependencies(job)

  chain = chains[job]
  chain = chain.reject{|j| seen.include? j.path} if chain
  chain = chain.reject{|dep| dep.done? } if chain
  piggyback = piggyback(job, job_rules, job_deps)
  dep_ids = job_deps.collect do |dep|
    seen[dep.path] ||= nil if chain && chain.include?(dep) #&& ! job.input_dependencies.include?(dep) 
    next_options = IndiferentHash.setup(options.dup)
    if piggyback and piggyback == dep
      next_options[:piggyback] ||= []
      next_options[:piggyback].push job
      ids = workload(dep, rules, chains, next_options, seen)
    else
      next_options.delete :piggyback
      ids = workload(dep, rules, chains, next_options, seen)
    end

    ids = [ids].flatten.compact.collect{|id| ['canfail', id] * ":"} if job.canfail_paths.include? dep.path

    seen[dep.path] = ids
    ids
  end.compact.flatten.uniq

  return seen[job.path] || dep_ids if seen.include?(job.path)

  if piggyback and seen[piggyback.path]
    return seen[job.path] = seen[piggyback.path] 
  end

  job_rules.delete :chain_tasks
  job_rules.delete :tasks
  job_rules.delete :workflow
  

  option_config_keys = options[:config_keys]

  job_options = IndiferentHash.setup(options.merge(job_rules).merge(:batch_dependencies => dep_ids))
  job_options.delete :orchestration_rules

  config_keys = job_rules.delete(:config_keys)
  if config_keys
    config_keys.gsub!(/,\s+/,',') 
    job_options[:config_keys] = job_options[:config_keys] ? config_keys + "," + job_options[:config_keys] : config_keys
  end

  if option_config_keys
    option_config_keys = option_config_keys.gsub(/,\s+/,',') 
    job_options[:config_keys] = job_options[:config_keys] ? job_options[:config_keys] + "," + option_config_keys : option_config_keys
  end

  if options[:piggyback]
    manifest = options[:piggyback].uniq
    manifest += [job]
    manifest.concat chain if chain

    job = options[:piggyback].first

    job_rules = self.job_rules(rules, job)
    new_config_keys = self.job_rules(rules, job)[:config_keys]
    if new_config_keys
      new_config_keys = new_config_keys.gsub(/,\s+/,',') 
      job_options[:config_keys] = job_options[:config_keys] ? job_options[:config_keys] + "," + new_config_keys : new_config_keys
    end

    job_options.delete :piggyback
  else
    manifest = [job]
    manifest.concat chain if chain
  end

  manifest.uniq!

  job_options[:manifest] = manifest.collect{|j| j.task_signature }

  job_options[:config_keys] = job_options[:config_keys].split(",").uniq * "," if job_options[:config_keys]

  if options[:dry_run]
    puts Log.color(:magenta, "Manifest: ") + Log.color(:blue, job_options[:manifest] * ", ") + " - tasks: #{job_options[:task_cpus] || 1} - time: #{job_options[:time]} - config: #{job_options[:config_keys]}"
    puts Log.color(:yellow, "Deps: ") + Log.color(:blue, job_options[:batch_dependencies]*", ")
    job_options[:manifest].first
  else
    run_job(job, job_options)
  end
end