Module: Workflow
- Includes:
- InputModule
- Included in:
- RemoteWorkflow
- Defined in:
- lib/rbbt/workflow.rb,
lib/rbbt/workflow/doc.rb,
lib/rbbt/workflow/usage.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/examples.rb,
lib/rbbt/workflow/definition.rb,
lib/rbbt/workflow/util/trace.rb,
lib/rbbt/workflow/util/orchestrator.rb,
lib/rbbt/workflow/integration/cromwell.rb,
lib/rbbt/workflow/integration/nextflow.rb
Defined Under Namespace
Modules: DependencyBlock Classes: Orchestrator, TaskNotFoundException
Constant Summary collapse
- DEFAULT_NAME =
{{{ JOB MANAGEMENT
"Default"
- TAG =
ENV["RBBT_INPUT_JOBNAME"] == "true" ? :inputs : :hash
- DEBUG_JOB_HASH =
ENV["RBBT_DEBUG_JOB_HASH"] == 'true'
- FORGET_DEP_TASKS =
ENV["RBBT_FORGET_DEP_TASKS"] == "true"
- REMOVE_DEP_TASKS =
ENV["RBBT_REMOVE_DEP_TASKS"] == "true"
Class Attribute Summary collapse
-
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
-
.workflow_dir ⇒ Object
Returns the value of attribute workflow_dir.
-
.workflows ⇒ Object
Returns the value of attribute workflows.
Instance Attribute Summary collapse
-
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
-
#description ⇒ Object
Returns the value of attribute description.
-
#documentation ⇒ Object
Returns the value of attribute documentation.
-
#example_dir ⇒ Object
Returns the value of attribute example_dir.
-
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
-
#helpers ⇒ Object
Returns the value of attribute helpers.
-
#last_task ⇒ Object
Returns the value of attribute last_task.
-
#libdir ⇒ Object
Returns the value of attribute libdir.
-
#load_step_cache ⇒ Object
Returns the value of attribute load_step_cache.
-
#remote_tasks ⇒ Object
Returns the value of attribute remote_tasks.
-
#step_cache ⇒ Object
Returns the value of attribute step_cache.
-
#stream_exports ⇒ Object
Returns the value of attribute stream_exports.
-
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
-
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
-
#task_description ⇒ Object
Returns the value of attribute task_description.
-
#tasks ⇒ Object
Returns the value of attribute tasks.
-
#workdir ⇒ Object
Returns the value of attribute workdir.
Class Method Summary collapse
- .__load_step(path) ⇒ Object
- ._load_step(path) ⇒ Object
- .doc_parse_chunks(str, pattern) ⇒ Object
- .doc_parse_first_line(str) ⇒ Object
- .doc_parse_up_to(str, pattern, keep = false) ⇒ Object
- .extended(base) ⇒ Object
- .fast_load_step(path) ⇒ Object
- .get_SOPT(workflow, task) ⇒ Object
- .init_remote_tasks ⇒ Object
- .installed_workflows ⇒ Object
- .job_path?(path) ⇒ Boolean
- .load_inputs(dir, input_names, input_types) ⇒ Object
- .load_remote_tasks(filename) ⇒ Object
- .load_step(path) ⇒ Object
- .load_step_cache ⇒ Object
- .load_workflow_file(filename) ⇒ Object
- .load_workflow_libdir(filename) ⇒ Object
- .local_workflow_filename(wf_name) ⇒ Object
- .parse_workflow_doc(doc) ⇒ Object
- .process_remote_tasks(remote_tasks) ⇒ Object
- .relocate(real, other) ⇒ Object
- .relocate_array(real, list) ⇒ Object
- .relocate_dependency(main, dep) ⇒ Object
- .require_local_workflow(wf_name) ⇒ Object
- .require_remote_workflow(wf_name, url) ⇒ Object
- .require_workflow(wf_name, force_local = false) ⇒ Object
- .resolve_locals(inputs) ⇒ Object
- .trace(seed_jobs, options = {}) ⇒ Object
- .transplant(listed, real, other) ⇒ Object
- .workdir ⇒ Object
-
.workdir=(path) ⇒ Object
{{{ ATTR DEFAULTS.
- .workflow_for(path) ⇒ Object
Instance Method Summary collapse
- #__job(taskname, jobname = nil, inputs = {}) ⇒ Object
- #_job(taskname, jobname = nil, inputs = {}) ⇒ Object
- #_prov_tasks(tree) ⇒ Object
- #add_remote_tasks(remote_tasks) ⇒ Object
- #all_exports ⇒ Object
- #assign_dep_inputs(_inputs, options, all_d, task_info) ⇒ Object
- #dep(*dependency, &block) ⇒ Object
- #dep_task(name, workflow, oname, *rest, &block) ⇒ Object
- #dep_tree(name) ⇒ Object
- #desc(description) ⇒ Object
- #doc(task = nil, abridge = false) ⇒ Object
- #documentation_markdown ⇒ Object
- #example(task_name, example) ⇒ Object
- #example_inputs(task_name, example) ⇒ Object
- #example_step(task_name, example, new_inputs = {}) ⇒ Object
- #examples ⇒ Object
- #export_asynchronous(*names) ⇒ Object (also: #export)
- #export_exec(*names) ⇒ Object
- #export_stream(*names) ⇒ Object
- #export_synchronous(*names) ⇒ Object
- #extension(extension) ⇒ Object
- #fast_load_id(id) ⇒ Object
-
#get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) ⇒ Object
{{{ LOAD FROM FILE.
- #get_SOPT(task) ⇒ Object
- #helper(name, &block) ⇒ Object
- #id_for(path) ⇒ Object
- #import(source, *args) ⇒ Object
- #import_task(workflow, orig, new) ⇒ Object
- #job(taskname, jobname = nil, inputs = {}) ⇒ Object
-
#jobs(taskname, query = nil) ⇒ Object
}}} LOAD FROM FILE.
- #load_cromwell(file) ⇒ Object
- #load_documentation ⇒ Object
- #load_id(id) ⇒ Object
- #load_name(task, name) ⇒ Object
- #load_step(path) ⇒ Object
-
#local_persist_setup ⇒ Object
{{{ Make workflow resources local.
- #local_workdir_setup ⇒ Object
- #log(status, message = nil, &block) ⇒ Object
- #make_local ⇒ Object
- #nextflow(path) ⇒ Object
- #nextflow_dir(path) ⇒ Object
- #nextflow_file(file, name = nil) ⇒ Object
- #override_dependencies(inputs) ⇒ Object
- #prov_string(tree) ⇒ Object
- #prov_tree(tree, offset = 0, seen = []) ⇒ Object
- #real_dependencies(task, orig_jobname, inputs, dependencies) ⇒ Object
- #rec_dependencies(taskname) ⇒ Object
- #rec_input_defaults(taskname) ⇒ Object
- #rec_input_descriptions(taskname) ⇒ Object
- #rec_input_options(taskname) ⇒ Object
- #rec_input_types(taskname) ⇒ Object
- #rec_input_use(taskname) ⇒ Object
-
#rec_inputs(taskname) ⇒ Object
def rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat(task_from_dep(tn).inputs) }.uniq end.
- #resumable ⇒ Object
- #returns(description) ⇒ Object
- #set_step_dependencies(step) ⇒ Object
- #setup_override_dependency(dep, workflow, task_name) ⇒ Object
- #SOPT_str(task) ⇒ Object
- #step_module ⇒ Object
- #step_path(taskname, jobname, inputs, dependencies, extension = nil) ⇒ Object
- #task(name, &block) ⇒ Object
- #task_exports ⇒ Object
- #task_for(path) ⇒ Object
- #task_from_dep(dep) ⇒ Object
- #task_info(name) ⇒ Object
- #unexport(*names) ⇒ Object
- #with_workdir(workdir) ⇒ Object
Methods included from InputModule
Class Attribute Details
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
25 26 27 |
# File 'lib/rbbt/workflow.rb', line 25 def autoinstall @autoinstall end |
.workflow_dir ⇒ Object
Returns the value of attribute workflow_dir.
25 26 27 |
# File 'lib/rbbt/workflow.rb', line 25 def workflow_dir @workflow_dir end |
.workflows ⇒ Object
Returns the value of attribute workflows.
25 26 27 |
# File 'lib/rbbt/workflow.rb', line 25 def workflows @workflows end |
Instance Attribute Details
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
207 208 209 |
# File 'lib/rbbt/workflow.rb', line 207 def asynchronous_exports @asynchronous_exports end |
#description ⇒ Object
Returns the value of attribute description.
203 204 205 |
# File 'lib/rbbt/workflow.rb', line 203 def description @description end |
#documentation ⇒ Object
Returns the value of attribute documentation.
55 56 57 |
# File 'lib/rbbt/workflow/doc.rb', line 55 def documentation @documentation end |
#example_dir ⇒ Object
Returns the value of attribute example_dir.
2 3 4 |
# File 'lib/rbbt/workflow/examples.rb', line 2 def example_dir @example_dir end |
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
207 208 209 |
# File 'lib/rbbt/workflow.rb', line 207 def exec_exports @exec_exports end |
#helpers ⇒ Object
Returns the value of attribute helpers.
205 206 207 |
# File 'lib/rbbt/workflow.rb', line 205 def helpers @helpers end |
#last_task ⇒ Object
Returns the value of attribute last_task.
206 207 208 |
# File 'lib/rbbt/workflow.rb', line 206 def last_task @last_task end |
#libdir ⇒ Object
Returns the value of attribute libdir.
204 205 206 |
# File 'lib/rbbt/workflow.rb', line 204 def libdir @libdir end |
#load_step_cache ⇒ Object
Returns the value of attribute load_step_cache.
209 210 211 |
# File 'lib/rbbt/workflow.rb', line 209 def load_step_cache @load_step_cache end |
#remote_tasks ⇒ Object
Returns the value of attribute remote_tasks.
210 211 212 |
# File 'lib/rbbt/workflow.rb', line 210 def remote_tasks @remote_tasks end |
#step_cache ⇒ Object
Returns the value of attribute step_cache.
208 209 210 |
# File 'lib/rbbt/workflow.rb', line 208 def step_cache @step_cache end |
#stream_exports ⇒ Object
Returns the value of attribute stream_exports.
207 208 209 |
# File 'lib/rbbt/workflow.rb', line 207 def stream_exports @stream_exports end |
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
207 208 209 |
# File 'lib/rbbt/workflow.rb', line 207 def synchronous_exports @synchronous_exports end |
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
206 207 208 |
# File 'lib/rbbt/workflow.rb', line 206 def task_dependencies @task_dependencies end |
#task_description ⇒ Object
Returns the value of attribute task_description.
206 207 208 |
# File 'lib/rbbt/workflow.rb', line 206 def task_description @task_description end |
#tasks ⇒ Object
Returns the value of attribute tasks.
205 206 207 |
# File 'lib/rbbt/workflow.rb', line 205 def tasks @tasks end |
#workdir ⇒ Object
Returns the value of attribute workdir.
204 205 206 |
# File 'lib/rbbt/workflow.rb', line 204 def workdir @workdir end |
Class Method Details
.__load_step(path) ⇒ Object
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 |
# File 'lib/rbbt/workflow.rb', line 540 def self.__load_step(path) if Open.remote?(path) || Open.ssh?(path) require 'rbbt/workflow/remote_workflow' return RemoteWorkflow.load_path path end step = Step.new path relocated = false step.dependencies = (step.info[:dependencies] || []).collect do |task,name,dep_path| if Open.exists?(dep_path) || Open.exists?(dep_path + '.info') || Open.remote?(dep_path) || Open.ssh?(dep_path) Workflow._load_step dep_path else new_path = relocate(path, dep_path) relocated = true if Open.exists?(new_path) || Open.exists?(new_path + '.info') Workflow._load_step new_path end end step.relocated = relocated step.load_inputs_from_info step end |
._load_step(path) ⇒ Object
602 603 604 605 606 |
# File 'lib/rbbt/workflow.rb', line 602 def self._load_step(path) Persist.memory("STEP", :path => path, :repo => load_step_cache) do __load_step(path) end end |
.doc_parse_chunks(str, pattern) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/rbbt/workflow/doc.rb', line 21 def self.doc_parse_chunks(str, pattern) parts = str.split(pattern) return {} if parts.length < 2 tasks = Hash[*parts[1..-1].collect{|v| v.strip}] tasks.delete_if{|t,d| d.empty?} tasks end |
.doc_parse_first_line(str) ⇒ Object
3 4 5 6 7 8 9 10 |
# File 'lib/rbbt/workflow/doc.rb', line 3 def self.doc_parse_first_line(str) if str.match(/^([^\n]*)\n\n(.*)/sm) str.replace $2 $1 else "" end end |
.doc_parse_up_to(str, pattern, keep = false) ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/doc.rb', line 12 def self.doc_parse_up_to(str, pattern, keep = false) pre, _pat, _post = str.partition pattern if _pat [pre, (keep ? _pat << _post : _post)] else _post end end |
.extended(base) ⇒ Object
34 35 36 37 38 39 |
# File 'lib/rbbt/workflow.rb', line 34 def self.extended(base) self.workflows << base libdir = Path.caller_lib_dir return if libdir.nil? base.libdir = Path.setup(libdir).tap{|p| p.resource = base} end |
.fast_load_step(path) ⇒ Object
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 |
# File 'lib/rbbt/workflow.rb', line 561 def self.fast_load_step(path) if Open.remote?(path) || Open.ssh?(path) require 'rbbt/workflow/remote_workflow' return RemoteWorkflow.load_path path end step = Step.new path step.dependencies = nil class << step def dependencies @dependencies ||= (self.info[:dependencies] || []).collect do |task,name,dep_path| dep = if Open.exists?(dep_path) || Open.exists?(dep_path + '.info') relocate = false Workflow.fast_load_step dep_path else new_path = Workflow.relocate(path, dep_path) relocated = true if Open.exists?(new_path) || Open.exists?(new_path + '.info') Workflow.fast_load_step new_path end dep.relocated = relocated dep end @dependencies end def inputs self.load_inputs_from_info unless @inputs @inputs end def dirty? false end def updated? true end end step end |
.get_SOPT(workflow, task) ⇒ Object
276 277 278 279 280 |
# File 'lib/rbbt/workflow/usage.rb', line 276 def self.get_SOPT(workflow, task) workflow = Workflow.require_workflow workflow if String === workflow task = workflow.tasks[task.to_sym] if String === task || Symbol === task workflow.get_SOPT(task) end |
.init_remote_tasks ⇒ Object
41 42 43 44 45 |
# File 'lib/rbbt/workflow.rb', line 41 def self.init_remote_tasks return if defined? @@init_remote_tasks and @@init_remote_tasks @@init_remote_tasks = true load_remote_tasks(Rbbt.root.etc.remote_tasks.find) if Rbbt.root.etc.remote_tasks.exists? end |
.installed_workflows ⇒ Object
83 84 85 86 87 |
# File 'lib/rbbt/workflow.rb', line 83 def self.installed_workflows self.workflow_dir['**/workflow.rb'].glob_all.collect do |file| File.basename(File.dirname(file)) end end |
.job_path?(path) ⇒ Boolean
19 20 21 |
# File 'lib/rbbt/workflow/accessor.rb', line 19 def self.job_path?(path) path.split("/")[-4] == "jobs" end |
.load_inputs(dir, input_names, input_types) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/rbbt/workflow/examples.rb', line 28 def self.load_inputs(dir, input_names, input_types) inputs = {} if File.exists?(dir) && ! File.directory?(dir) Log.debug "Loading inputs from #{dir}, not a directory trying as tar.gz" tarfile = dir digest = CMD.cmd("md5sum '#{tarfile}'").read.split(" ").first tmpdir = Rbbt.tmp.input_bundle[digest].find Misc.untar(tarfile, tmpdir) unless File.exists? tmpdir files = tmpdir.glob("*") if files.length == 1 && File.directory?(files.first) tmpdir = files.first end load_inputs(tmpdir, input_names, input_types) else dir = Path.setup(dir.dup) input_names.each do |input| file = dir[input].find file = dir.glob(input.to_s + ".*").reject{|f| f =~ /\.md5$/}.first if file.nil? or not file.exists? Log.debug "Trying #{ input }: #{file}" next unless file and file.exists? case input_types[input] when :file Log.debug "Pointing #{ input } to #{file}" if file =~ /\.yaml/ inputs[input.to_sym] = YAML.load(Open.read(file)) else if File.symlink?(file) inputs[input.to_sym] = File.readlink(file) else inputs[input.to_sym] = Open.realpath(file) end end when :text Log.debug "Reading #{ input } from #{file}" inputs[input.to_sym] = Open.read(file) when :array Log.debug "Reading array #{ input } from #{file}" inputs[input.to_sym] = Open.read(file).split("\n") when :tsv Log.debug "Opening tsv #{ input } from #{file}" inputs[input.to_sym] = TSV.open(file) when :boolean inputs[input.to_sym] = (file.read.strip == 'true') else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read.strip end end inputs = IndiferentHash.setup(inputs) dir.glob("*#*").each do |od| name = File.basename(od) value = Open.read(od) Log.debug "Loading override dependency #{ name } as #{value}" inputs[name] = value.chomp end inputs end end |
.load_remote_tasks(filename) ⇒ Object
712 713 714 715 716 |
# File 'lib/rbbt/workflow.rb', line 712 def self.load_remote_tasks(filename) yaml_text = Open.read(filename) remote_workflow_tasks = YAML.load(yaml_text) Workflow.process_remote_tasks(remote_workflow_tasks) end |
.load_step(path) ⇒ Object
608 609 610 611 612 613 614 615 616 617 |
# File 'lib/rbbt/workflow.rb', line 608 def self.load_step(path) path = Path.setup(path.dup) unless Path === path path = path.find begin _load_step(path) ensure load_step_cache.clear end end |
.load_step_cache ⇒ Object
267 268 269 |
# File 'lib/rbbt/workflow.rb', line 267 def self.load_step_cache Thread.current[:load_step_cache] ||= {} end |
.load_workflow_file(filename) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/rbbt/workflow.rb', line 65 def self.load_workflow_file(filename) begin load_workflow_libdir(filename) filename = File.(filename) Rbbt.add_version(filename) require filename Log.debug{"Workflow loaded from: #{ filename }"} return true rescue Exception Log.warn{"Error loading workflow: #{ filename }"} raise $! end end |
.load_workflow_libdir(filename) ⇒ Object
57 58 59 60 61 62 63 |
# File 'lib/rbbt/workflow.rb', line 57 def self.load_workflow_libdir(filename) workflow_lib_dir = File.join(File.dirname(File.(filename)), 'lib') if File.directory? workflow_lib_dir Log.debug "Adding workflow lib directory to LOAD_PATH: #{workflow_lib_dir}" $LOAD_PATH.unshift(workflow_lib_dir) end end |
.local_workflow_filename(wf_name) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/rbbt/workflow.rb', line 105 def self.local_workflow_filename(wf_name) filename = nil if Path === wf_name case # Points to workflow file when ((File.exist?(wf_name.find) and not File.directory?(wf_name.find)) or File.exist?(wf_name.find + '.rb')) filename = wf_name.find # Points to workflow dir when (File.exist?(wf_name.find) and File.directory?(wf_name.find) and File.exist?(File.join(wf_name.find, 'workflow.rb'))) filename = wf_name['workflow.rb'].find end else if ((File.exist?(wf_name) and not File.directory?(wf_name)) or File.exist?(wf_name + '.rb')) filename = (wf_name =~ /\.?\//) ? wf_name : "./" << wf_name else filename = workflow_dir[wf_name]['workflow.rb'].find end end if filename.nil? or not File.exist?(filename) wf_name_snake = Misc.snake_case(wf_name) return local_workflow_filename(wf_name_snake) if wf_name_snake != wf_name end filename end |
.parse_workflow_doc(doc) ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/rbbt/workflow/doc.rb', line 29 def self.parse_workflow_doc(doc) title = doc_parse_first_line doc description, task_info = doc_parse_up_to doc, /^# Tasks/i task_description, tasks = doc_parse_up_to task_info, /^##/, true tasks = doc_parse_chunks tasks, /## (.*)/ {:title => title.strip, :description => description.strip, :task_description => task_description.strip, :tasks => tasks} end |
.process_remote_tasks(remote_tasks) ⇒ Object
696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 |
# File 'lib/rbbt/workflow.rb', line 696 def self.process_remote_tasks(remote_tasks) require 'rbbt/workflow/remote_workflow' remote_tasks.each do |workflow, info| wf = Workflow.require_workflow workflow wf.remote_tasks ||= {} IndiferentHash.setup wf.remote_tasks info.each do |remote, tasks| remote_wf = RemoteWorkflow.new remote, workflow tasks.each do |task| Log.debug "Add remote task #{task} in #{wf} using #{remote_wf.url}" wf.remote_tasks[task.to_sym] = remote_wf end end end end |
.relocate(real, other) ⇒ Object
519 520 521 522 523 524 525 526 |
# File 'lib/rbbt/workflow.rb', line 519 def self.relocate(real, other) preal = real.split(/\/+/) pother = other.split(/\/+/) end_part = pother[-3..-1] * "/" new_path = preal[0..-4] * "/" << "/" << end_part return new_path if File.exists?(new_path) || File.exists?(new_path + '.info') Rbbt.var.jobs[end_part].find end |
.relocate_array(real, list) ⇒ Object
504 505 506 507 508 509 510 511 512 513 514 515 516 517 |
# File 'lib/rbbt/workflow.rb', line 504 def self.relocate_array(real, list) preal = real.split(/\/+/) prefix = preal[0..-4] * "/" list.collect do |other| pother = other.split(/\/+/) end_part = pother[-3..-1] * "/" new_path = prefix + "/" << end_part if File.exists? new_path new_path else Rbbt.var.jobs[end_part].find end end end |
.relocate_dependency(main, dep) ⇒ Object
528 529 530 531 532 533 534 535 536 537 538 |
# File 'lib/rbbt/workflow.rb', line 528 def self.relocate_dependency(main, dep) dep_path = dep.path path = main.path if Open.exists?(dep_path) || Open.exists?(dep_path + '.info') dep else new_path = relocate(path, dep_path) relocated = true if Open.exists?(new_path) || Open.exists?(new_path + '.info') Workflow._load_step new_path end end |
.require_local_workflow(wf_name) ⇒ Object
135 136 137 138 139 140 141 142 143 |
# File 'lib/rbbt/workflow.rb', line 135 def self.require_local_workflow(wf_name) filename = local_workflow_filename(wf_name) if filename and File.exist? filename load_workflow_file filename else return false end end |
.require_remote_workflow(wf_name, url) ⇒ Object
47 48 49 50 |
# File 'lib/rbbt/workflow.rb', line 47 def self.require_remote_workflow(wf_name, url) require 'rbbt/workflow/remote_workflow' eval "Object::#{wf_name} = RemoteWorkflow.new '#{ url }', '#{wf_name}'" end |
.require_workflow(wf_name, force_local = false) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
# File 'lib/rbbt/workflow.rb', line 145 def self.require_workflow(wf_name, force_local=false) Workflow.init_remote_tasks # Already loaded begin workflow = Misc.string2const wf_name Log.debug{"Workflow #{ wf_name } already loaded"} return workflow rescue Exception end # Load remotely if not force_local and Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml if Hash === remote_workflows and remote_workflows.include?(wf_name) url = remote_workflows[wf_name] begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end end if Open.remote?(wf_name) or Open.ssh?(wf_name) url = wf_name if Open.ssh?(wf_name) wf_name = File.basename(url.split(":").last) else wf_name = File.basename(url) end begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end # Load locally if wf_name =~ /::\w+$/ clean_name = wf_name.sub(/::.*/,'') Log.info{"Looking for '#{wf_name}' in '#{clean_name}'"} require_workflow clean_name return Misc.string2const Misc.camel_case(wf_name) end Log.high{"Loading workflow #{wf_name}"} require_local_workflow(wf_name) or (Workflow.autoinstall and `rbbt workflow install #{Misc.snake_case(wf_name)} || rbbt workflow install #{wf_name}` and require_local_workflow(wf_name)) or raise("Workflow not found or could not be loaded: #{ wf_name }") begin Misc.string2const Misc.camel_case(wf_name) rescue Workflow.workflows.last || true end end |
.resolve_locals(inputs) ⇒ Object
311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/rbbt/workflow.rb', line 311 def self.resolve_locals(inputs) inputs.each do |name, value| if (String === value and value =~ /^local:(.*?):(.*)/) or (Array === value and value.length == 1 and value.first =~ /^local:(.*?):(.*)/) or (TSV === value and value.size == 1 and value.keys.first =~ /^local:(.*?):(.*)/) task_name = $1 jobname = $2 value = load_id(File.join(task_name, jobname)).load inputs[name] = value end end end |
.trace(seed_jobs, options = {}) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/rbbt/workflow/util/trace.rb', line 4 def self.trace(seed_jobs, = {}) jobs = [] seed_jobs.each{|j| jobs << j; jobs += j.rec_dependencies} data = TSV.setup({}, "Job~Workflow,Task,Start,End#:type=:list") min_start = nil max_done = nil jobs.each do |job| next unless job.info[:done] started = job.info[:started] ddone = job.info[:done] code = [job.workflow, job.task_name].compact.collect{|s| s.to_s} * "." code = code + '.' + job.name data[code] = [job.workflow.to_s, job.task_name, started, ddone] if min_start.nil? min_start = started else min_start = started if started < min_start end if max_done.nil? max_done = ddone else max_done = ddone if ddone > max_done end end data.add_field "Start.second" do |k,value| value["Start"] - min_start end data.add_field "End.second" do |k,value| value["End"] - min_start end if [:fix_gap] ranges = [] data.through do |k,values| start, eend = values.values_at "Start.second", "End.second" ranges << (start..eend) end gaps = {} last = nil Misc.collapse_ranges(ranges).each do |range| start = range.begin eend = range.end if last gaps[last] = start - last end last = eend end data.process "End.second" do |value,k,values| gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size}) value - gap end data.process "Start.second" do |value,k,values| gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size}) value - gap end end tasks_info = {} jobs.each do |dep| next unless dep.info[:done] task = [dep.workflow, dep.task_name].compact.collect{|s| s.to_s} * "#" info = tasks_info[task] ||= {} time = dep.info[:done] - dep.info[:started] info[:time] ||= [] info[:time] << time cpus = nil spark = false shard = false dep.info[:config_keys].select do |kinfo| key, value, tokens = kinfo key = key.to_s cpus = value if key.include? 'cpu' spark = value if key == 'spark' shard = value if key == 'shard' end info[:cpus] = cpus || 1 info[:spark] = spark info[:shard] = shard end stats = TSV.setup({}, "Task~Calls,Avg. Time,Total Time,Cpus,Spark,Shard#:type=:list") tasks_info.each do |task, info| time_lists, cpus, spark, shard = info.values_at :time, :cpus, :spark, :shard avg_time = Misc.mean(time_lists) total_time = Misc.sum(time_lists) calls = time_lists.length stats[task] = [calls, avg_time, total_time, cpus, spark, shard] end raise "No jobs to process" if data.size == 0 start = data.column("Start.second").values.flatten.collect{|v| v.to_f}.min eend = data.column("End.second").values.flatten.collect{|v| v.to_f}.max total = eend - start Log.info "Total time elapsed: #{total} seconds" if [:fix_gap] total_gaps = Misc.sum(gaps.collect{|k,v| v}) Log.info "Total gaps: #{total_gaps} seconds" end plot, width, height = .values_at :plot, :width, :height if plot data.R <<-EOF, [:svg] rbbt.require('tidyverse') rbbt.require('ggplot2') names(data) <- make.names(names(data)) data$id = rownames(data) data$content = data$Task data$start = data$Start data$end = data$End data$Project = data$Workflow tasks = data #theme_gantt <- function(base_size=11, base_family="Source Sans Pro Light") { theme_gantt <- function(base_size=11, base_family="Sans Serif") { ret <- theme_bw(base_size, base_family) %+replace% theme(panel.background = element_rect(fill="#ffffff", colour=NA), axis.title.x=element_text(vjust=-0.2), axis.title.y=element_text(vjust=1.5), title=element_text(vjust=1.2, family="Source Sans Pro Semibold"), panel.border = element_blank(), axis.line=element_blank(), panel.grid.minor=element_blank(), panel.grid.major.y = element_blank(), panel.grid.major.x = element_line(size=0.5, colour="grey80"), axis.ticks=element_blank(), legend.position="bottom", axis.title=element_text(size=rel(1.2), family="Source Sans Pro Semibold"), strip.text=element_text(size=rel(1.5), family="Source Sans Pro Semibold"), strip.background=element_rect(fill="#ffffff", colour=NA), panel.spacing.y=unit(1.5, "lines"), legend.key = element_blank()) ret } tasks.long <- tasks %>% gather(date.type, task.date, -c(Project, Task, id, Start.second, End.second)) %>% arrange(date.type, task.date) %>% mutate(id = factor(id, levels=rev(unique(id)), ordered=TRUE)) x.breaks <- seq(length(tasks$Task) + 0.5 - 3, 0, by=-3) timeline <- ggplot(tasks.long, aes(y=id, yend=id, x=Start.second, xend=End.second, colour=Task)) + geom_segment() + geom_vline(xintercept=x.breaks, colour="grey80", linetype="dotted") + guides(colour=guide_legend(title=NULL)) + labs(x=NULL, y=NULL) + theme_gantt() + theme(axis.text.x=element_text(angle=45, hjust=1)) rbbt.png_plot('#{plot}', 'plot(timeline)', width=#{width}, height=#{height}, pointsize=6) EOF end if [:plot_data] data else stats end end |
.transplant(listed, real, other) ⇒ Object
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 |
# File 'lib/rbbt/workflow.rb', line 484 def self.transplant(listed, real, other) if listed.nil? parts = real.split("/") other_parts = other.split("/") listed = (other_parts[0..-4] + parts[-3..-1]) * "/" end sl = listed.split("/", -1) so = other.split("/", -1) sr = real.split("/", -1) prefix = [] while true break if sl[0] != so[0] cl = sl.shift co = so.shift prefix << cl end File.join(sr - sl + so) end |
.workdir ⇒ Object
219 220 221 222 223 224 225 |
# File 'lib/rbbt/workflow.rb', line 219 def self.workdir @@workdir ||= if defined? Rbbt Rbbt.var.jobs else Path.setup('var/jobs') end end |
.workdir=(path) ⇒ Object
{{{ ATTR DEFAULTS
214 215 216 217 |
# File 'lib/rbbt/workflow.rb', line 214 def self.workdir=(path) path = Path.setup path.dup unless Path === path @@workdir = path end |
.workflow_for(path) ⇒ Object
475 476 477 478 479 480 481 |
# File 'lib/rbbt/workflow/accessor.rb', line 475 def self.workflow_for(path) begin Kernel.const_get File.dirname(File.dirname(path)) rescue nil end end |
Instance Method Details
#__job(taskname, jobname = nil, inputs = {}) ⇒ Object
337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'lib/rbbt/workflow.rb', line 337 def __job(taskname, jobname = nil, inputs = {}) taskname = taskname.to_sym return remote_tasks[taskname].job(taskname, jobname, inputs) if remote_tasks and remote_tasks.include? taskname task = tasks[taskname] raise "Task not found: #{ taskname }" if task.nil? inputs = IndiferentHash.setup(inputs) Workflow.resolve_locals(inputs) task_info = task_info(taskname) task_inputs = task_info[:inputs] #defaults = IndiferentHash.setup(task_info[:input_defaults]).merge(task.input_defaults) all_defaults = IndiferentHash.setup(task_info[:input_defaults]) defaults = IndiferentHash.setup(task.input_defaults) missing_inputs = [] task.required_inputs.each do |input| missing_inputs << input if inputs[input].nil? end if task.required_inputs if missing_inputs.length == 1 raise ParameterException, "Input #{missing_inputs.first} is required but was not provided or is nil" end if missing_inputs.length > 1 raise ParameterException, "Inputs #{Misc.humanize_list(missing_inputs)} are required but were not provided or are nil" end # jobname => true sets the value of the input to the name of the job if task. jobname_input = task..select{|i,o| o[:jobname]}.collect{|i,o| i }.first else jobname_input = nil end if jobname_input && jobname && inputs[jobname_input].nil? inputs[jobname_input] = jobname end real_inputs = {} has_overriden_inputs = false inputs.each do |k,v| #has_overriden_inputs = true if String === k and k.include? "#" next unless (task_inputs.include?(k.to_sym) or task_inputs.include?(k.to_s)) default = all_defaults[k] next if default == v next if (String === default and Symbol === v and v.to_s == default) next if (Symbol === default and String === v and v == default.to_s) real_inputs[k.to_sym] = v end jobname_input_value = inputs[jobname_input] || all_defaults[jobname_input] if jobname_input && jobname.nil? && String === jobname_input_value && ! jobname_input_value.include?('/') jobname = jobname_input_value end jobname = DEFAULT_NAME if jobname.nil? or jobname.empty? dependencies = real_dependencies(task, jobname, defaults.merge(inputs), task_dependencies[taskname] || []) overriden = has_overriden_inputs || dependencies.select{|dep| dep.overriden }.any? if real_inputs.empty? && Workflow::TAG != :inputs && ! overriden step_path = step_path taskname, jobname, [], [], task.extension input_values = task.take_input_values(inputs) else input_values = task.take_input_values(inputs) step_path = step_path taskname, jobname, input_values, dependencies, task.extension end job = get_job_step step_path, task, input_values, dependencies job.workflow = self job.clean_name = jobname job.overriden = overriden job.real_inputs = real_inputs.keys job end |
#_job(taskname, jobname = nil, inputs = {}) ⇒ Object
417 418 419 420 421 422 423 424 425 426 427 428 |
# File 'lib/rbbt/workflow.rb', line 417 def _job(taskname, jobname = nil, inputs = {}) _inputs = IndiferentHash.setup(inputs.dup) task_info = task_info(taskname) task_inputs = task_info[:inputs] persist_inputs = inputs.values_at(*task_inputs) persist_inputs += inputs.values_at(*inputs.keys.select{|k| String === k && k.include?("#") }.sort) Persist.memory("STEP", :workflow => self.to_s, :taskname => taskname, :jobname => jobname, :inputs => persist_inputs, :repo => step_cache) do __job(taskname, jobname, inputs) end end |
#_prov_tasks(tree) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/rbbt/workflow/usage.rb', line 96 def _prov_tasks(tree) tasks = [] heap = [tree] while heap.any? t = heap.pop t.each do |k,v| tasks << k heap << v end end tasks end |
#add_remote_tasks(remote_tasks) ⇒ Object
688 689 690 691 692 693 694 |
# File 'lib/rbbt/workflow.rb', line 688 def add_remote_tasks(remote_tasks) remote_tasks.each do |remote, tasks| tasks.each do |task| self.remote_tasks[task.to_f] = remote end end end |
#all_exports ⇒ Object
304 305 306 |
# File 'lib/rbbt/workflow.rb', line 304 def all_exports @all_exports ||= asynchronous_exports + synchronous_exports + exec_exports + stream_exports end |
#assign_dep_inputs(_inputs, options, all_d, task_info) ⇒ Object
252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/rbbt/workflow/accessor.rb', line 252 def assign_dep_inputs(_inputs, , all_d, task_info) IndiferentHash.setup(_inputs) .each{|i,v| next if i == :compute or i == "compute" case v when :compute compute = v when Symbol rec_dependency = all_d.flatten.select{|d| d.task_name.to_sym == v }.first if rec_dependency.nil? if _inputs.include?(v) #_inputs[i] = _inputs.delete(v) _inputs[i] = _inputs[v] unless _inputs.include? i #_inputs.delete(v) else _inputs[i] = v unless _inputs.include? i end else = task_info[:input_options][i] || {} #ToDo why was this always true? if [:stream] or true #rec_dependency.run(true).grace unless rec_dependency.done? or rec_dependency.running? _inputs[i] = rec_dependency else rec_dependency.abort if rec_dependency.streaming? and not rec_dependency.running? rec_dependency.clean if rec_dependency.error? or rec_dependency.aborted? if rec_dependency.streaming? and rec_dependency.running? _inputs[i] = rec_dependency.join.load else rec_dependency.run(true) rec_dependency.join _inputs[i] = rec_dependency.load end end end else _inputs[i] = v end } if _inputs end |
#dep(*dependency, &block) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/rbbt/workflow/definition.rb', line 45 def dep(*dependency, &block) @dependencies ||= [] dependency = [tasks.keys.last] if dependency.empty? && ! block_given? if block_given? if dependency.any? wf, task_name, = dependency , task_name = task_name, nil if Hash === task_name , wf = wf, nil if Hash === wf task_name, wf = wf, self if task_name.nil? DependencyBlock.setup block, [wf, task_name, ] end @dependencies << block else if Module === dependency.first or (defined? RemoteWorkflow and RemoteWorkflow === dependency.first) or Hash === dependency.last dependency = ([self] + dependency) unless Module === dependency.first || (defined?(RemoteWorkflow) && RemoteWorkflow === dependency.first) @dependencies << dependency else @dependencies.concat dependency end end end |
#dep_task(name, workflow, oname, *rest, &block) ⇒ Object
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/rbbt/workflow/definition.rb', line 74 def dep_task(name, workflow, oname, *rest, &block) dep(workflow, oname, *rest, &block) extension workflow.tasks[oname].extension if workflow.tasks.include?(oname) unless @extension task name do raise RbbtException, "dependency not found in dep_task" if dependencies.empty? dep = dependencies.last.join raise dep.get_exception if dep.error? set_info :result_type, dep.info[:result_type] forget = config :forget_dep_tasks, "forget_dep_tasks", "key:forget_dep_tasks", :default => FORGET_DEP_TASKS if forget remove = config :remove_dep_tasks, "remove_dep_tasks", "key:remove_dep_tasks", :default => REMOVE_DEP_TASKS self.archive_deps self.copy_files_dir self.dependencies = self.dependencies - [dep] Open.rm_rf self.files_dir if Open.exist? self.files_dir FileUtils.cp_r dep.files_dir, self.files_dir if Open.exist?(dep.files_dir) Open.ln_h dep.path, self.tmp_path case remove.to_s when 'true' dep.clean when 'recursive' dep.rec_dependencies.each do |d| d.clean unless config(:remove_dep, d.task_signature, d.task_name, d.workflow.to_s, :default => true).to_s == 'false' end dep.clean unless config(:remove_dep, dep.task_signature, dep.task_name, dep.workflow.to_s, :default => true).to_s == 'false' end else if Open.exists?(dep.files_dir) Open.rm_rf self.files_dir Open.link dep.files_dir, self.files_dir end Open.link dep.path, self.path end nil end end |
#dep_tree(name) ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/rbbt/workflow/usage.rb', line 70 def dep_tree(name) @dep_tree ||= {} @dep_tree[name] ||= begin dep_tree = {} self.task_dependencies[name.to_sym].reverse.each do |dep| dep = dep.first if Array === dep && dep.length == 1 dep = dep.dependency if DependencyBlock === dep workflow, task = case dep when Array dep.values_at 0, 1 when Symbol, String [self, dep] else next end key = [workflow, task] dep_tree[key] = workflow.dep_tree(task) end if name && self.task_dependencies[name.to_sym] dep_tree end end |
#desc(description) ⇒ Object
29 30 31 |
# File 'lib/rbbt/workflow/definition.rb', line 29 def desc(description) @description = description end |
#doc(task = nil, abridge = false) ⇒ Object
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/rbbt/workflow/usage.rb', line 157 def doc(task = nil, abridge = false) if task.nil? puts Log.color :magenta, self.to_s puts Log.color :magenta, "=" * self.to_s.length if self.documentation[:title] and not self.documentation[:title].empty? puts puts Misc.format_paragraph self.documentation[:title] end if self.documentation[:description] and not self.documentation[:description].empty? puts puts Misc.format_paragraph self.documentation[:description] end puts puts Log.color :magenta, "## TASKS" if self.documentation[:task_description] and not self.documentation[:task_description].empty? puts puts Misc.format_paragraph self.documentation[:task_description] end puts final = Set.new not_final = Set.new tasks.each do |name,task| tree = dep_tree(name) not_final += tree.keys final << name unless not_final.include?(name) end not_final.each do |p| final -= [p.last] end tasks.each do |name,task| description = task.description || "" description = description.split("\n\n").first next if abridge && ! final.include?(name) puts Misc.format_definition_list_item(name.to_s, description, Log.terminal_width, 20, :yellow) prov_string = prov_string(dep_tree(name)) puts Log.color :blue, " ->" + prov_string if prov_string && ! prov_string.empty? end else if Task === task task_name = task.name else task_name = task task = self.tasks[task_name] end #dependencies = self.rec_dependencies(task_name).collect{|dep_name| Array === dep_name ? dep_name.first.tasks[dep_name[1].to_sym] : self.tasks[dep_name.to_sym]} task.doc(self, self.rec_dependencies(task_name)) dep_tree = {[self, task_name] => dep_tree(task_name)} prov_tree = prov_tree(dep_tree) if prov_tree && ! prov_tree.empty? puts Log.color :magenta, "## DEPENDENCY GRAPH (abridged)" puts prov_tree.split("\n").each do |line| next if line.strip.empty? if m = line.match(/^( *)(\w+?)#(\w*)/i) offset, workflow, task_name = m.values_at 1, 2, 3 puts [offset, Log.color(:magenta, workflow), "#", Log.color(:yellow, task_name)] * "" else puts Log.color :blue, line end end puts end if self.examples.include? task_name self.examples[task_name].each do |example| puts Log.color(:magenta, "Example ") << Log.color(:green, example) + " -- " + Log.color(:blue, example_dir[task_name][example]) inputs = self.example(task_name, example) inputs.each do |input, type, file| case type when :tsv, :array, :text, :file lines = file.read.split("\n") head = lines[0..5].compact * "\n\n" head = head[0..500] puts Misc.format_definition_list_item(input, head, 1000, -1, :blue).gsub(/\n\s*\n/,"\n") puts '...' if lines.length > 6 else puts Misc.format_definition_list_item(input, file.read, Log.terminal_width, 20, :blue) end end puts end end end end |
#documentation_markdown ⇒ Object
37 38 39 40 41 42 43 44 45 |
# File 'lib/rbbt/workflow/doc.rb', line 37 def documentation_markdown return "" if @libdir.nil? file = @libdir['workflow.md'].find if file.exists? file.read else "" end end |
#example(task_name, example) ⇒ Object
21 22 23 24 25 26 |
# File 'lib/rbbt/workflow/examples.rb', line 21 def example(task_name, example) task_info(task_name.to_sym)[:input_types].collect do |input,type| next unless example_dir[task_name][example][input].exists? [input, type, example_dir[task_name][example][input].find] end.compact end |
#example_inputs(task_name, example) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/rbbt/workflow/examples.rb', line 92 def example_inputs(task_name, example) inputs = {} IndiferentHash.setup(inputs) example(task_name, example).each do |input,type,file| case type when :tsv, :array, :text Log.debug "Pointing #{ input } to #{file}" inputs[input.to_sym] = file when :boolean inputs[input.to_sym] = (file.read.strip == 'true') else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read.strip end end inputs end |
#example_step(task_name, example, new_inputs = {}) ⇒ Object
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/rbbt/workflow/examples.rb', line 112 def example_step(task_name, example, new_inputs = {}) inputs = example_inputs(task_name, example) if new_inputs and new_inputs.any? IndiferentHash.setup(new_inputs) inputs = inputs.merge(new_inputs) end self.job(task_name, example, inputs) end |
#examples ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/examples.rb', line 8 def examples return {} unless self.libdir.examples.exists? examples = {} example_dir.glob("*/*").each do |example_dir| example = File.basename(example_dir) task_name = File.basename(File.dirname(example_dir)) examples[task_name] ||= [] examples[task_name] << example end IndiferentHash.setup examples examples end |
#export_asynchronous(*names) ⇒ Object Also known as: export
183 184 185 186 187 188 |
# File 'lib/rbbt/workflow/definition.rb', line 183 def export_asynchronous(*names) unexport *names asynchronous_exports.concat names asynchronous_exports.uniq! asynchronous_exports end |
#export_exec(*names) ⇒ Object
169 170 171 172 173 174 |
# File 'lib/rbbt/workflow/definition.rb', line 169 def export_exec(*names) unexport *names exec_exports.concat names exec_exports.uniq! exec_exports end |
#export_stream(*names) ⇒ Object
190 191 192 193 194 195 |
# File 'lib/rbbt/workflow/definition.rb', line 190 def export_stream(*names) unexport *names stream_exports.concat names stream_exports.uniq! stream_exports end |
#export_synchronous(*names) ⇒ Object
176 177 178 179 180 181 |
# File 'lib/rbbt/workflow/definition.rb', line 176 def export_synchronous(*names) unexport *names synchronous_exports.concat names synchronous_exports.uniq! synchronous_exports end |
#extension(extension) ⇒ Object
33 34 35 |
# File 'lib/rbbt/workflow/definition.rb', line 33 def extension(extension) @extension = extension end |
#fast_load_id(id) ⇒ Object
631 632 633 634 635 636 637 638 639 640 |
# File 'lib/rbbt/workflow.rb', line 631 def fast_load_id(id) path = if Path === workdir workdir[id].find else File.join(workdir, id) end task = task_for path return remote_tasks[task].load_id(id) if remote_tasks && remote_tasks.include?(task) return Workflow.fast_load_step path end |
#get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) ⇒ Object
{{{ LOAD FROM FILE
456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 |
# File 'lib/rbbt/workflow.rb', line 456 def get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) step_path = step_path.call if Proc === step_path persist = input_values.nil? ? false : true persist = false key = Path === step_path ? step_path.find : step_path step = Step.new step_path, task, input_values, dependencies set_step_dependencies(step) unless dependencies step.extend step_module step.task ||= task step.inputs ||= input_values step.dependencies = dependencies if dependencies and (step.dependencies.nil? or step.dependencies.length < dependencies.length) step end |
#get_SOPT(task) ⇒ Object
271 272 273 274 |
# File 'lib/rbbt/workflow/usage.rb', line 271 def get_SOPT(task) sopt_option_string = self.SOPT_str(task) SOPT.get sopt_option_string end |
#helper(name, &block) ⇒ Object
25 26 27 |
# File 'lib/rbbt/workflow/definition.rb', line 25 def helper(name, &block) helpers[name] = block end |
#id_for(path) ⇒ Object
466 467 468 469 470 471 472 473 |
# File 'lib/rbbt/workflow/accessor.rb', line 466 def id_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end Misc.path_relative_to workdir_find, path end |
#import(source, *args) ⇒ Object
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/rbbt/workflow/definition.rb', line 199 def import(source, *args) if args.empty? tasks = source.tasks.collect{|n,t| n} + source.helpers.collect{|n,h| n } else tasks = args.flatten end tasks.each do |task| Log.high "Task #{task} from #{source.to_s} is already present in #{self.to_s} and will be cloacked" if self.tasks.include? task.to_sym self.tasks[task.to_sym] = source.tasks[task.to_sym] if source.tasks.include? task.to_sym self.task_dependencies[task.to_sym] = source.task_dependencies[task.to_sym] if source.tasks.include? task.to_sym self.task_description[task.to_sym] = source.task_description[task.to_sym] if source.tasks.include? task.to_sym self.helpers[task.to_sym] = source.helpers[task.to_sym] if source.helpers.include? task.to_sym end end |
#import_task(workflow, orig, new) ⇒ Object
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
# File 'lib/rbbt/workflow.rb', line 227 def import_task(workflow, orig, new) orig_task = workflow.tasks[orig] new_task = orig_task.dup = {} orig_task.singleton_methods. select{|method| method.to_s[-1] != "="[0]}.each{|method| if orig_task.respond_to?(method.to_s + "=") [method.to_s] = orig_task.send(method.to_s) end } Task.setup(, &new_task) new_task.workflow = self new_task.name = new tasks[new] = new_task task_dependencies[new] = workflow.task_dependencies[orig] task_description[new] = workflow.task_description[orig] end |
#job(taskname, jobname = nil, inputs = {}) ⇒ Object
430 431 432 433 434 435 436 |
# File 'lib/rbbt/workflow.rb', line 430 def job(taskname, jobname = nil, inputs = {}) begin _job(taskname, jobname, inputs) ensure step_cache.clear end end |
#jobs(taskname, query = nil) ⇒ Object
}}} LOAD FROM FILE
651 652 653 654 655 656 657 658 659 |
# File 'lib/rbbt/workflow.rb', line 651 def jobs(taskname, query = nil) task_dir = File.join(File.(workdir.find), taskname.to_s) pattern = File.join(File.(task_dir), '**/*') job_info_files = Dir.glob(Step.info_file(pattern)).collect{|f| Misc.path_relative_to task_dir, f } job_info_files = job_info_files.select{|f| f.index(query) == 0 } if query job_info_files.collect{|f| job_name = Step.job_name_for_info_file(f, tasks[taskname].extension) } end |
#load_cromwell(file) ⇒ Object
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rbbt/workflow/integration/cromwell.rb', line 18 def load_cromwell(file) jar = Rbbt.software.opt.jar["wdltool.jar"].produce.find inputs = JSON.load(CMD.cmd("java -jar '#{jar}' inputs '#{file}'")) workflow_inputs = {} inputs.each do |input,input_type| workflow, task, input_name = input.split(".") workflow_inputs[workflow] ||= {} if input_name.nil? input_name = task else input_name = [task, input_name] * "." end workflow_inputs[workflow][input_name] = input_type end workflow_inputs.each do |workflow,input_list| input_list.each do |input_name,input_type| input_type = input_type.split(" ").last.sub('?','') input_type_fixed = case input_type when "File", "file" :file when "Int" :integer when /Array/ :array else input_type.downcase.to_sym end desc = [workflow, input_name] * "." default = nil input input_name, input_type_fixed, desc, default, :nofile => true end task workflow => :string do |*args| cromwell = file = {} Misc.in_dir(self.files_dir) do ["metadata-output"] = file('metadata.json') ["inputs"] = file('inputs') cromwell_inputs = {} self.inputs.to_hash.each do |input, value| next if value.nil? key = [workflow.to_s, input] * "." cromwell_inputs[key] = value end Open.write(file('inputs'), cromwell_inputs.to_json ) Cromwell.run_cromwell(cromwell, self.files_dir, ) end Open.read(Dir.glob(File.join(files_dir, "/cromwell-executions/#{workflow}/*/call-*/execution/stdout")).first) end end end |
#load_documentation ⇒ Object
47 48 49 50 51 52 53 |
# File 'lib/rbbt/workflow/doc.rb', line 47 def load_documentation @documentation = Workflow.parse_workflow_doc documentation_markdown @documentation[:tasks].each do |task, description| raise "Documentation for #{ task }, but not a #{ self.to_s } task" unless tasks.include? task.to_sym tasks[task.to_sym].description = description end end |
#load_id(id) ⇒ Object
619 620 621 622 623 624 625 626 627 628 |
# File 'lib/rbbt/workflow.rb', line 619 def load_id(id) path = if Path === workdir workdir[id].find else File.join(workdir, id) end task = task_for path return remote_tasks[task].load_id(id) if remote_tasks && remote_tasks.include?(task) return Workflow.load_step path end |
#load_name(task, name) ⇒ Object
642 643 644 645 646 647 |
# File 'lib/rbbt/workflow.rb', line 642 def load_name(task, name) return remote_tasks[task].load_step(path) if remote_tasks and remote_tasks.include? task task = tasks[task.to_sym] if String === task or Symbol === task path = step_path task.name, name, [], [], task.extension get_job_step path, task end |
#load_step(path) ⇒ Object
475 476 477 478 479 480 481 482 |
# File 'lib/rbbt/workflow.rb', line 475 def load_step(path) task = task_for path if task get_job_step path, tasks[task.to_sym] else get_job_step path end end |
#local_persist_setup ⇒ Object
{{{ Make workflow resources local
662 663 664 665 666 667 |
# File 'lib/rbbt/workflow.rb', line 662 def local_persist_setup class << self include LocalPersist end self.local_persist_dir = Rbbt.var.cache.persistence.find :lib end |
#local_workdir_setup ⇒ Object
669 670 671 |
# File 'lib/rbbt/workflow.rb', line 669 def local_workdir_setup self.workdir = Rbbt.var.jobs.find :lib end |
#log(status, message = nil, &block) ⇒ Object
23 24 25 |
# File 'lib/rbbt/workflow/accessor.rb', line 23 def log(status, = nil, &block) Step.log(status, , nil, &block) end |
#make_local ⇒ Object
673 674 675 676 |
# File 'lib/rbbt/workflow.rb', line 673 def make_local local_persist_setup local_workdir_setup end |
#nextflow(path) ⇒ Object
30 31 32 33 34 35 36 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 30 def nextflow(path) if File.directory?(path) nextflow_dir path else nextflow_file path end end |
#nextflow_dir(path) ⇒ Object
25 26 27 28 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 25 def nextflow_dir(path) main = File.join(path, 'main.nf') nextflow_file main, File.basename(path) end |
#nextflow_file(file, name = nil) ⇒ Object
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/rbbt/workflow/integration/nextflow.rb', line 2 def nextflow_file(file, name = nil) file = file + '.nf' unless File.exists?(file) || ! File.exists?(file + '.nf') file = File.(file) name ||= File.basename(file).sub(/\.nf$/,'') params = Open.read(file).scan(/params\.\w+/).collect{|p| p.split(".").last}.uniq params.each do |param| input param, :string end task name => :text do work = file('work') output = file('output') profile = config :profile, :nextflow Misc.in_dir output do if profile cmd("nextflow run -work-dir #{work} -name #{clean_name} -ansi-log false -profile #{profile} #{file}", inputs.to_hash.merge('add_option_dashes' => true)) else cmd("nextflow run -work-dir #{work} -name #{clean_name} -ansi-log false #{file}", inputs.to_hash.merge('add_option_dashes' => true)) end end end end |
#override_dependencies(inputs) ⇒ Object
297 298 299 300 301 302 303 304 305 306 307 308 309 |
# File 'lib/rbbt/workflow/accessor.rb', line 297 def override_dependencies(inputs) override_dependencies = IndiferentHash.setup({}) return override_dependencies if inputs.nil? inputs.each do |key,value| if String === key && m = key.match(/(.*)#(.*)/) workflow, task = m.values_at 1, 2 workflow = self.to_s if workflow.empty? override_dependencies[workflow] ||= IndiferentHash.setup({}) override_dependencies[workflow][task] = value end end override_dependencies end |
#prov_string(tree) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/rbbt/workflow/usage.rb', line 109 def prov_string(tree) description = "" last = nil seen = Set.new tasks = _prov_tasks(tree) tasks.each do |workflow,task_name| next if seen.include?([workflow,task_name]) child = last && last.include?([workflow, task_name]) first = last.nil? last = _prov_tasks(workflow.dep_tree(task_name)) if child description << "->" << task_name.to_s elsif first description << "" << task_name.to_s else description << ";" << task_name.to_s end seen << [workflow, task_name] end description end |
#prov_tree(tree, offset = 0, seen = []) ⇒ Object
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/rbbt/workflow/usage.rb', line 137 def prov_tree(tree, offset = 0, seen = []) return "" if tree.empty? lines = [] offset_str = " " * offset lines << offset_str tree.each do |p,dtree| next if seen.include?(p) seen.push(p) workflow, task = p lines << offset_str + [workflow.to_s, task.to_s] * "#" + "\n" + workflow.prov_tree(dtree, offset + 1, seen) end lines * "\n" end |
#real_dependencies(task, orig_jobname, inputs, dependencies) ⇒ Object
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/rbbt/workflow/accessor.rb', line 327 def real_dependencies(task, orig_jobname, inputs, dependencies) real_dependencies = [] path_deps = {} override_dependencies = override_dependencies(inputs) dependencies.each do |dependency| _inputs = IndiferentHash.setup(inputs.dup) jobname = orig_jobname jobname = _inputs[:jobname] if _inputs.include? :jobname real_dep = case dependency when Array workflow, dep_task, = dependency if override_dependencies[workflow.to_s] && value = override_dependencies[workflow.to_s][dep_task] setup_override_dependency(value, workflow, dep_task) else compute = [:compute] if all_d = (real_dependencies + real_dependencies.flatten.collect{|d| d.rec_dependencies} ).flatten.compact.uniq _inputs = assign_dep_inputs(_inputs, , all_d, workflow.task_info(dep_task)) jobname = _inputs.delete :jobname if _inputs.include? :jobname job = workflow._job(dep_task, jobname, _inputs) ComputeDependency.setup(job, compute) if compute job end when Step dependency when Symbol if override_dependencies[self.to_s] && value = override_dependencies[self.to_s][dependency] setup_override_dependency(value, self, dependency) else _job(dependency, jobname, _inputs) end when Proc if DependencyBlock === dependency orig_dep = dependency.dependency wf, task_name, = orig_dep = {} if .nil? compute = [:compute] = IndiferentHash.setup(.dup) dep = dependency.call jobname, _inputs.merge(), real_dependencies dep = [dep] unless Array === dep new_=[] dep.each{|d| next if d.nil? if Hash === d d[:workflow] ||= wf d[:task] ||= task_name _override_dependencies = override_dependencies.merge(override_dependencies(d[:inputs] || {})) d = if _override_dependencies[d[:workflow].to_s] && value = _override_dependencies[d[:workflow].to_s][d[:task]] setup_override_dependency(value, d[:workflow], d[:task]) else task_info = d[:workflow].task_info(d[:task]) _inputs = assign_dep_inputs({}, .merge(d[:inputs] || {}), real_dependencies, task_info) d[:workflow]._job(d[:task], d[:jobname], _inputs) end end ComputeDependency.setup(d, compute) if compute new_ << d } dep = new_ else _inputs = IndiferentHash.setup(_inputs.dup) dep = dependency.call jobname, _inputs, real_dependencies if Hash === dep dep[:workflow] ||= wf || self _override_dependencies = override_dependencies.merge(override_dependencies(dep[:inputs] || {})) if _override_dependencies[dep[:workflow].to_s] && value = _override_dependencies[dep[:workflow].to_s][dep[:task]] setup_override_dependency(value, dep[:workflow], dep[:task]) else task_info = (dep[:task] && dep[:workflow]) ? dep[:workflow].task_info(dep[:task]) : nil _inputs = assign_dep_inputs({}, dep[:inputs], real_dependencies, task_info) dep = dep[:workflow]._job(dep[:task], dep[:jobname], _inputs) end end end dep else raise "Dependency for #{task.name} not understood: #{Misc.fingerprint dependency}" end real_dependencies << real_dep end real_dependencies.flatten.compact end |
#rec_dependencies(taskname) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/rbbt/workflow/accessor.rb', line 75 def rec_dependencies(taskname) @rec_dependencies ||= {} @rec_dependencies[taskname] ||= begin if task_dependencies.include? taskname deps = task_dependencies[taskname] #all_deps = deps.select{|dep| String === dep or Symbol === dep or Array === dep} all_deps = [] deps.each do |dep| if DependencyBlock === dep all_deps << dep.dependency if dep.dependency else all_deps << dep unless Proc === dep end begin case dep when Array wf, t, o = dep wf.rec_dependencies(t.to_sym).each do |d| if Array === d new = d.dup else new = [dep.first, d] end if Hash === o and not o.empty? if Hash === new.last hash = new.last.dup o.each{|k,v| hash[k] ||= v} new[new.length-1] = hash else new.push o.dup end end all_deps << new end if wf && t when String, Symbol rec_deps = rec_dependencies(dep.to_sym) all_deps.concat rec_deps when DependencyBlock dep = dep.dependency raise TryAgain end rescue TryAgain retry end end all_deps.uniq else [] end end end |
#rec_input_defaults(taskname) ⇒ Object
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
# File 'lib/rbbt/workflow/accessor.rb', line 159 def rec_input_defaults(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject(IndiferentHash.setup({})){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_defaults elsif Symbol === tn new = tasks[tn.to_sym].input_defaults else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_descriptions(taskname) ⇒ Object
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/rbbt/workflow/accessor.rb', line 220 def rec_input_descriptions(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_descriptions elsif Symbol === tn new = tasks[tn.to_sym].input_descriptions else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_options(taskname) ⇒ Object
236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 |
# File 'lib/rbbt/workflow/accessor.rb', line 236 def (taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym]. elsif Symbol === tn new = tasks[tn.to_sym]. else next acc end acc = new.merge(acc) acc = acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_types(taskname) ⇒ Object
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/rbbt/workflow/accessor.rb', line 175 def rec_input_types(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_types elsif Symbol === tn new = tasks[tn.to_sym].input_types else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_use(taskname) ⇒ Object
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/rbbt/workflow/accessor.rb', line 191 def rec_input_use(taskname) task = task_from_dep(taskname) deps = rec_dependencies(taskname) inputs = {} task.inputs.each do |input| name = task.name workflow = (task.workflow || self).to_s inputs[input] ||= {} inputs[input][workflow] ||= [] inputs[input][workflow] << name end dep_inputs = Task.dep_inputs deps, self dep_inputs.each do |dep,is| name = dep.name workflow = dep.workflow is.each do |input| inputs[input] ||= {} inputs[input][workflow] ||= [] inputs[input][workflow] << name end end inputs end |
#rec_inputs(taskname) ⇒ Object
def rec_inputs(taskname)
[taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat(task_from_dep(tn).inputs) }.uniq
end
152 153 154 155 156 157 |
# File 'lib/rbbt/workflow/accessor.rb', line 152 def rec_inputs(taskname) task = task_from_dep(taskname) deps = rec_dependencies(taskname) dep_inputs = task.dep_inputs deps, self task.inputs + dep_inputs.values.flatten end |
#resumable ⇒ Object
37 38 39 |
# File 'lib/rbbt/workflow/definition.rb', line 37 def resumable @resumable = true end |
#returns(description) ⇒ Object
41 42 43 |
# File 'lib/rbbt/workflow/definition.rb', line 41 def returns(description) @result_description = description end |
#set_step_dependencies(step) ⇒ Object
439 440 441 442 443 444 445 446 447 448 449 450 451 452 |
# File 'lib/rbbt/workflow.rb', line 439 def set_step_dependencies(step) if step.info[:dependencies] Misc.insist do step.dependencies = step.info[:dependencies].collect do |task, job, path| next if job.nil? if Open.exists?(path) load_step(path) else Workflow.load_step(path) end end end end end |
#setup_override_dependency(dep, workflow, task_name) ⇒ Object
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/rbbt/workflow/accessor.rb', line 311 def setup_override_dependency(dep, workflow, task_name) dep = Step === dep ? dep : Workflow.load_step(dep) dep.workflow = workflow dep.info[:name] = dep.name dep.original_task_name ||= dep.task_name if dep.workflow begin workflow = Kernel.const_get workflow if String === workflow dep.task = workflow.tasks[task_name] if dep.task.nil? && workflow.tasks.include?(task_name) rescue Log.exception $! end dep.task_name = task_name dep.overriden = dep.original_task_name.to_sym dep end |
#SOPT_str(task) ⇒ Object
259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/rbbt/workflow/usage.rb', line 259 def SOPT_str(task) = [] self.rec_inputs(task.name).each do |name| short = name.to_s.chars.first boolean = self.rec_input_types(task.name)[name].to_sym == :boolean << "-#{short}--#{name}#{boolean ? "" : "*"}" end * ":" end |
#step_module ⇒ Object
324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/rbbt/workflow.rb', line 324 def step_module @_m ||= begin m = Module.new helpers.each do |name,block| m.send(:define_method, name, &block) end m end @_m end |
#step_path(taskname, jobname, inputs, dependencies, extension = nil) ⇒ Object
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 |
# File 'lib/rbbt/workflow/accessor.rb', line 426 def step_path(taskname, jobname, inputs, dependencies, extension = nil) raise "Jobname makes an invalid path: #{ jobname }" if jobname.include? '..' if inputs.length > 0 or dependencies.any? tagged_jobname = case TAG when :hash clean_inputs = Annotated.purge(inputs) clean_inputs = clean_inputs.collect{|i| Symbol === i ? i.to_s : i } deps_str = dependencies.collect{|d| (Step === d || (defined?(RemoteStep) && RemoteStep === Step)) ? "Step: " << d.short_path : d } key_obj = {:inputs => clean_inputs, :dependencies => deps_str } key_str = Misc.obj2str(key_obj) hash_str = Misc.digest(key_str) Log.debug "Hash for '#{[taskname, jobname] * "/"}' #{hash_str} for #{key_str}" if DEBUG_JOB_HASH jobname + '_' << hash_str when :inputs all_inputs = {} inputs.zip(self.task_info(taskname)[:inputs]) do |i,f| all_inputs[f] = i end dependencies.each do |dep| ri = dep.recursive_inputs ri.zip(ri.fields).each do |i,f| all_inputs[f] = i end end all_inputs.any? ? jobname + '_' << Misc.obj2str(all_inputs) : jobname else jobname end else tagged_jobname = jobname end if extension and not extension.empty? tagged_jobname = tagged_jobname + ('.' << extension.to_s) end workdir[taskname][tagged_jobname].find end |
#task(name, &block) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/rbbt/workflow/definition.rb', line 112 def task(name, &block) if Hash === name type = name.first.last name = name.first.first else result_type = consume_result_type || :marshal end name = name.to_sym block = self.method(name) unless block_given? task_info = { :name => name, :inputs => consume_inputs, :description => consume_description, :input_types => consume_input_types, :result_type => (String === type ? type.to_sym : type), :result_description => consume_result_description, :input_defaults => consume_input_defaults, :input_descriptions => consume_input_descriptions, :required_inputs => consume_required_inputs, :extension => consume_extension, :resumable => consume_resumable, :input_options => } task_info[:extension] = case task_info[:result_type].to_s when "tsv" "tsv" when "yaml" "yaml" when "marshal" "marshal" when "json" "json" else nil end if task_info[:extension].nil? task = Task.setup(task_info, &block) last_task = task tasks[name] = task task_dependencies[name] = consume_dependencies end |
#task_exports ⇒ Object
500 501 502 |
# File 'lib/rbbt/workflow/accessor.rb', line 500 def task_exports [exec_exports, synchronous_exports, asynchronous_exports, stream_exports].compact.flatten.uniq end |
#task_for(path) ⇒ Object
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 |
# File 'lib/rbbt/workflow/accessor.rb', line 483 def task_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end workdir_find = File.(workdir_find) path = File.(path) dir = File.dirname(path) begin Misc.path_relative_to(workdir_find, dir).sub(/([^\/]+)\/.*/,'\1') rescue nil end end |
#task_from_dep(dep) ⇒ Object
135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/rbbt/workflow/accessor.rb', line 135 def task_from_dep(dep) task = case dep when Array dep.first.tasks[dep[1]] when String tasks[dep.to_sym] when Symbol tasks[dep.to_sym] end raise "Unknown dependency: #{Misc.fingerprint dep}" if task.nil? task end |
#task_info(name) ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/rbbt/workflow/accessor.rb', line 27 def task_info(name) name = name.to_sym task = tasks[name] raise "No '#{name}' task in '#{self.to_s}' Workflow" if task.nil? id = File.join(self.to_s, name.to_s) @task_info ||= {} @task_info[id] ||= begin description = task.description result_description = task.result_description result_type = task.result_type inputs = rec_inputs(name).uniq input_types = rec_input_types(name) input_descriptions = rec_input_descriptions(name) input_use = rec_input_use(name) input_defaults = rec_input_defaults(name) = (name) extension = task.extension export = case when (synchronous_exports.include?(name.to_sym) or synchronous_exports.include?(name.to_s)) :synchronous when (asynchronous_exports.include?(name.to_sym) or asynchronous_exports.include?(name.to_s)) :asynchronous when (exec_exports.include?(name.to_sym) or exec_exports.include?(name.to_s)) :exec when (stream_exports.include?(name.to_sym) or stream_exports.include?(name.to_s)) :stream else :none end dependencies = task_dependencies[name].select{|dep| String === dep or Symbol === dep} { :id => id, :description => description, :export => export, :inputs => inputs, :input_types => input_types, :input_descriptions => input_descriptions, :input_defaults => input_defaults, :input_options => , :input_use => input_use, :result_type => result_type, :result_description => result_description, :dependencies => dependencies, :extension => extension } end end |
#unexport(*names) ⇒ Object
160 161 162 163 164 165 166 167 |
# File 'lib/rbbt/workflow/definition.rb', line 160 def unexport(*names) names = names.collect{|n| n.to_s} + names.collect{|n| n.to_sym} names.uniq! exec_exports.replace exec_exports - names if exec_exports synchronous_exports.replace synchronous_exports - names if synchronous_exports asynchronous_exports.replace asynchronous_exports - names if asynchronous_exports stream_exports.replace stream_exports - names if stream_exports end |