Module: Workflow
- Includes:
- InputModule
- Included in:
- WorkflowRESTClient
- Defined in:
- lib/rbbt/workflow.rb,
lib/rbbt/workflow/doc.rb,
lib/rbbt/workflow/usage.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/examples.rb,
lib/rbbt/workflow/definition.rb
Defined Under Namespace
Modules: DependencyBlock Classes: TaskNotFoundException
Constant Summary collapse
- STEP_CACHE =
{}
- DEFAULT_NAME =
{{{ JOB MANAGEMENT
"Default"
- TAG =
ENV["RBBT_INPUT_JOBNAME"] == "true" ? :inputs : :hash
Class Attribute Summary collapse
-
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
-
.workflow_dir ⇒ Object
Returns the value of attribute workflow_dir.
-
.workflows ⇒ Object
Returns the value of attribute workflows.
Instance Attribute Summary collapse
-
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
-
#description ⇒ Object
Returns the value of attribute description.
-
#documentation ⇒ Object
Returns the value of attribute documentation.
-
#example_dir ⇒ Object
Returns the value of attribute example_dir.
-
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
-
#helpers ⇒ Object
Returns the value of attribute helpers.
-
#last_task ⇒ Object
Returns the value of attribute last_task.
-
#libdir ⇒ Object
Returns the value of attribute libdir.
-
#remote_tasks ⇒ Object
Returns the value of attribute remote_tasks.
-
#step_cache ⇒ Object
Returns the value of attribute step_cache.
-
#stream_exports ⇒ Object
Returns the value of attribute stream_exports.
-
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
-
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
-
#task_description ⇒ Object
Returns the value of attribute task_description.
-
#tasks ⇒ Object
Returns the value of attribute tasks.
-
#workdir ⇒ Object
Returns the value of attribute workdir.
Class Method Summary collapse
- .doc_parse_chunks(str, pattern) ⇒ Object
- .doc_parse_first_line(str) ⇒ Object
- .doc_parse_up_to(str, pattern, keep = false) ⇒ Object
- .extended(base) ⇒ Object
- .init_remote_tasks ⇒ Object
- .installed_workflows ⇒ Object
- .load_inputs(dir, input_names, input_types) ⇒ Object
- .load_remote_tasks(filename) ⇒ Object
- .load_workflow_file(filename) ⇒ Object
- .load_workflow_libdir(filename) ⇒ Object
- .local_workflow_filename(wf_name) ⇒ Object
- .parse_workflow_doc(doc) ⇒ Object
- .process_remote_tasks(remote_tasks) ⇒ Object
- .require_local_workflow(wf_name) ⇒ Object
- .require_remote_workflow(wf_name, url) ⇒ Object
- .require_workflow(wf_name, force_local = false) ⇒ Object
- .resolve_locals(inputs) ⇒ Object
- .workdir ⇒ Object
-
.workdir=(path) ⇒ Object
{{{ ATTR DEFAULTS.
Instance Method Summary collapse
- #add_remote_tasks(remote_tasks) ⇒ Object
- #all_exports ⇒ Object
- #assign_dep_inputs(_inputs, options, all_d, task_info) ⇒ Object
- #dep(*dependency, &block) ⇒ Object
- #desc(description) ⇒ Object
- #doc(task = nil) ⇒ Object
- #documentation_markdown ⇒ Object
- #example(task_name, example) ⇒ Object
- #example_inputs(task_name, example) ⇒ Object
- #example_step(task_name, example, new_inputs = {}) ⇒ Object
- #examples ⇒ Object
- #export_asynchronous(*names) ⇒ Object (also: #export)
- #export_exec(*names) ⇒ Object
- #export_stream(*names) ⇒ Object
- #export_synchronous(*names) ⇒ Object
- #extension(extension) ⇒ Object
-
#get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) ⇒ Object
{{{ LOAD FROM FILE.
- #helper(name, &block) ⇒ Object
- #id_for(path) ⇒ Object
- #job(taskname, jobname = nil, inputs = {}) ⇒ Object
-
#jobs(taskname, query = nil) ⇒ Object
}}} LOAD FROM FILE.
- #load_documentation ⇒ Object
- #load_id(id) ⇒ Object
- #load_name(task, name) ⇒ Object
- #load_step(path) ⇒ Object
-
#local_persist_setup ⇒ Object
{{{ Make workflow resources local.
- #local_workdir_setup ⇒ Object
- #log(status, message = nil, &block) ⇒ Object
- #make_local ⇒ Object
- #real_dependencies(task, orig_jobname, inputs, dependencies) ⇒ Object
- #rec_dependencies(taskname) ⇒ Object
- #rec_input_defaults(taskname) ⇒ Object
- #rec_input_descriptions(taskname) ⇒ Object
- #rec_input_options(taskname) ⇒ Object
- #rec_input_types(taskname) ⇒ Object
- #rec_input_use(taskname) ⇒ Object
-
#rec_inputs(taskname) ⇒ Object
def rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat(task_from_dep(tn).inputs) }.uniq end.
- #returns(description) ⇒ Object
- #set_step_dependencies(step) ⇒ Object
- #step_module ⇒ Object
- #step_path(taskname, jobname, inputs, dependencies, extension = nil) ⇒ Object
- #task(name, &block) ⇒ Object
- #task_exports ⇒ Object
- #task_for(path) ⇒ Object
- #task_from_dep(dep) ⇒ Object
- #task_info(name) ⇒ Object
- #unexport(*names) ⇒ Object
- #with_workdir(workdir) ⇒ Object
Methods included from InputModule
Class Attribute Details
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
25 26 27 |
# File 'lib/rbbt/workflow.rb', line 25 def autoinstall @autoinstall end |
.workflow_dir ⇒ Object
Returns the value of attribute workflow_dir.
25 26 27 |
# File 'lib/rbbt/workflow.rb', line 25 def workflow_dir @workflow_dir end |
.workflows ⇒ Object
Returns the value of attribute workflows.
25 26 27 |
# File 'lib/rbbt/workflow.rb', line 25 def workflows @workflows end |
Instance Attribute Details
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
193 194 195 |
# File 'lib/rbbt/workflow.rb', line 193 def asynchronous_exports @asynchronous_exports end |
#description ⇒ Object
Returns the value of attribute description.
189 190 191 |
# File 'lib/rbbt/workflow.rb', line 189 def description @description end |
#documentation ⇒ Object
Returns the value of attribute documentation.
54 55 56 |
# File 'lib/rbbt/workflow/doc.rb', line 54 def documentation @documentation end |
#example_dir ⇒ Object
Returns the value of attribute example_dir.
2 3 4 |
# File 'lib/rbbt/workflow/examples.rb', line 2 def example_dir @example_dir end |
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
193 194 195 |
# File 'lib/rbbt/workflow.rb', line 193 def exec_exports @exec_exports end |
#helpers ⇒ Object
Returns the value of attribute helpers.
191 192 193 |
# File 'lib/rbbt/workflow.rb', line 191 def helpers @helpers end |
#last_task ⇒ Object
Returns the value of attribute last_task.
192 193 194 |
# File 'lib/rbbt/workflow.rb', line 192 def last_task @last_task end |
#libdir ⇒ Object
Returns the value of attribute libdir.
190 191 192 |
# File 'lib/rbbt/workflow.rb', line 190 def libdir @libdir end |
#remote_tasks ⇒ Object
Returns the value of attribute remote_tasks.
195 196 197 |
# File 'lib/rbbt/workflow.rb', line 195 def remote_tasks @remote_tasks end |
#step_cache ⇒ Object
Returns the value of attribute step_cache.
194 195 196 |
# File 'lib/rbbt/workflow.rb', line 194 def step_cache @step_cache end |
#stream_exports ⇒ Object
Returns the value of attribute stream_exports.
193 194 195 |
# File 'lib/rbbt/workflow.rb', line 193 def stream_exports @stream_exports end |
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
193 194 195 |
# File 'lib/rbbt/workflow.rb', line 193 def synchronous_exports @synchronous_exports end |
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
192 193 194 |
# File 'lib/rbbt/workflow.rb', line 192 def task_dependencies @task_dependencies end |
#task_description ⇒ Object
Returns the value of attribute task_description.
192 193 194 |
# File 'lib/rbbt/workflow.rb', line 192 def task_description @task_description end |
#tasks ⇒ Object
Returns the value of attribute tasks.
191 192 193 |
# File 'lib/rbbt/workflow.rb', line 191 def tasks @tasks end |
#workdir ⇒ Object
Returns the value of attribute workdir.
190 191 192 |
# File 'lib/rbbt/workflow.rb', line 190 def workdir @workdir end |
Class Method Details
.doc_parse_chunks(str, pattern) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/rbbt/workflow/doc.rb', line 21 def self.doc_parse_chunks(str, pattern) parts = str.split(pattern) return {} if parts.length < 2 tasks = Hash[*parts[1..-1].collect{|v| v.strip}] tasks.delete_if{|t,d| d.empty?} tasks end |
.doc_parse_first_line(str) ⇒ Object
3 4 5 6 7 8 9 10 |
# File 'lib/rbbt/workflow/doc.rb', line 3 def self.doc_parse_first_line(str) if str.match(/^([^\n]*)\n\n(.*)/sm) str.replace $2 $1 else "" end end |
.doc_parse_up_to(str, pattern, keep = false) ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/doc.rb', line 12 def self.doc_parse_up_to(str, pattern, keep = false) pre, _pat, _post = str.partition pattern if _pat [pre, (keep ? _pat << _post : _post)] else _post end end |
.extended(base) ⇒ Object
34 35 36 37 |
# File 'lib/rbbt/workflow.rb', line 34 def self.extended(base) self.workflows << base base.libdir = Path.setup(Path.caller_lib_dir).tap{|p| p.resource = base} end |
.init_remote_tasks ⇒ Object
39 40 41 42 43 |
# File 'lib/rbbt/workflow.rb', line 39 def self.init_remote_tasks return if defined? @@init_remote_tasks and @@init_remote_tasks @@init_remote_tasks = true load_remote_tasks(Rbbt.root.etc.remote_tasks.find) if Rbbt.root.etc.remote_tasks.exists? end |
.installed_workflows ⇒ Object
74 75 76 77 78 |
# File 'lib/rbbt/workflow.rb', line 74 def self.installed_workflows self.workflow_dir.glob('**/workflow.rb').collect do |file| File.basename(File.dirname(file)) end end |
.load_inputs(dir, input_names, input_types) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/rbbt/workflow/examples.rb', line 28 def self.load_inputs(dir, input_names, input_types) inputs = {} dir = Path.setup(dir.dup) input_names.each do |input| file = dir[input].find file = dir.glob(input.to_s + ".*").first if file.nil? or not file.exists? Log.debug "Trying #{ input }: #{file}" next unless file and file.exists? case input_types[input] when :tsv, :array, :text, :file Log.debug "Pointing #{ input } to #{file}" inputs[input.to_sym] = file when :boolean inputs[input.to_sym] = (file.read.strip == 'true') else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read.strip end end IndiferentHash.setup(inputs) end |
.load_remote_tasks(filename) ⇒ Object
477 478 479 480 481 |
# File 'lib/rbbt/workflow.rb', line 477 def self.load_remote_tasks(filename) yaml_text = Open.read(filename) remote_workflow_tasks = YAML.load(yaml_text) Workflow.process_remote_tasks(remote_workflow_tasks) end |
.load_workflow_file(filename) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/rbbt/workflow.rb', line 58 def self.load_workflow_file(filename) begin load_workflow_libdir(filename) filename = File.(filename) require filename Log.debug{"Workflow loaded from: #{ filename }"} return true rescue Exception Log.warn{"Error loading workflow: #{ filename }"} raise $! end end |
.load_workflow_libdir(filename) ⇒ Object
50 51 52 53 54 55 56 |
# File 'lib/rbbt/workflow.rb', line 50 def self.load_workflow_libdir(filename) workflow_lib_dir = File.join(File.dirname(File.(filename)), 'lib') if File.directory? workflow_lib_dir Log.debug "Adding workflow lib directory to LOAD_PATH: #{workflow_lib_dir}" $LOAD_PATH.unshift(workflow_lib_dir) end end |
.local_workflow_filename(wf_name) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/rbbt/workflow.rb', line 96 def self.local_workflow_filename(wf_name) filename = nil if Path === wf_name case # Points to workflow file when ((File.exist?(wf_name.find) and not File.directory?(wf_name.find)) or File.exist?(wf_name.find + '.rb')) filename = wf_name.find # Points to workflow dir when (File.exist?(wf_name.find) and File.directory?(wf_name.find) and File.exist?(File.join(wf_name.find, 'workflow.rb'))) filename = wf_name['workflow.rb'].find end else if ((File.exist?(wf_name) and not File.directory?(wf_name)) or File.exist?(wf_name + '.rb')) filename = (wf_name =~ /\.?\//) ? wf_name : "./" << wf_name else filename = workflow_dir[wf_name]['workflow.rb'].find end end if filename.nil? or not File.exist?(filename) wf_name_snake = Misc.snake_case(wf_name) return local_workflow_filename(wf_name_snake) if wf_name_snake != wf_name end filename end |
.parse_workflow_doc(doc) ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/rbbt/workflow/doc.rb', line 29 def self.parse_workflow_doc(doc) title = doc_parse_first_line doc description, task_info = doc_parse_up_to doc, /^# Tasks/i task_description, tasks = doc_parse_up_to task_info, /^##/, true tasks = doc_parse_chunks tasks, /## (.*)/ {:title => title.strip, :description => description.strip, :task_description => task_description.strip, :tasks => tasks} end |
.process_remote_tasks(remote_tasks) ⇒ Object
462 463 464 465 466 467 468 469 470 471 472 473 474 475 |
# File 'lib/rbbt/workflow.rb', line 462 def self.process_remote_tasks(remote_tasks) require 'rbbt/rest/client' remote_tasks.each do |workflow, info| wf = Workflow.require_workflow workflow wf.remote_tasks ||= {} IndiferentHash.setup wf.remote_tasks info.each do |remote, tasks| remote_wf = WorkflowRESTClient.new remote, workflow tasks.each do |task| wf.remote_tasks[task.to_sym] = remote_wf end end end end |
.require_local_workflow(wf_name) ⇒ Object
126 127 128 129 130 131 132 133 134 |
# File 'lib/rbbt/workflow.rb', line 126 def self.require_local_workflow(wf_name) filename = local_workflow_filename(wf_name) if filename and File.exist? filename load_workflow_file filename else return false end end |
.require_remote_workflow(wf_name, url) ⇒ Object
45 46 47 48 |
# File 'lib/rbbt/workflow.rb', line 45 def self.require_remote_workflow(wf_name, url) require 'rbbt/rest/client' eval "Object::#{wf_name} = WorkflowRESTClient.new '#{ url }', '#{wf_name}'" end |
.require_workflow(wf_name, force_local = false) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# File 'lib/rbbt/workflow.rb', line 136 def self.require_workflow(wf_name, force_local=false) Workflow.init_remote_tasks # Already loaded begin workflow = Misc.string2const wf_name Log.debug{"Workflow #{ wf_name } already loaded"} return workflow rescue Exception end # Load remotely if not force_local and Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml if Hash === remote_workflows and remote_workflows.include?(wf_name) url = remote_workflows[wf_name] begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end end if Open.remote? wf_name url = wf_name wf_name = File.basename(url) begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end # Load locally if wf_name =~ /::\w+$/ clean_name = wf_name.sub(/::.*/,'') Log.info{"Looking for '#{wf_name}' in '#{clean_name}'"} require_workflow clean_name return Misc.string2const Misc.camel_case(wf_name) end Log.info{"Loading workflow #{wf_name}"} require_local_workflow(wf_name) or (Workflow.autoinstall and `rbbt workflow install #{Misc.snake_case(wf_name)}` and require_local_workflow(Misc.snake_case(wf_name))) or raise("Workflow not found or could not be loaded: #{ wf_name }") begin Misc.string2const Misc.camel_case(wf_name) rescue Workflow.workflows.last || true end end |
.resolve_locals(inputs) ⇒ Object
272 273 274 275 276 277 278 279 280 281 282 283 |
# File 'lib/rbbt/workflow.rb', line 272 def self.resolve_locals(inputs) inputs.each do |name, value| if value =~ /^local:(.*?):(.*)/ or (Array === value and value.length == 1 and value.first =~ /^local:(.*?):(.*)/) or (TSV === value and value.size == 1 and value.keys.first =~ /^local:(.*?):(.*)/) task_name = $1 jobname = $2 value = load_id(File.join(task_name, jobname)).load inputs[name] = value end end end |
Instance Method Details
#add_remote_tasks(remote_tasks) ⇒ Object
454 455 456 457 458 459 460 |
# File 'lib/rbbt/workflow.rb', line 454 def add_remote_tasks(remote_tasks) remote_tasks.each do |remote, tasks| tasks.each do |task| self.remote_tasks[task.to_f] = remote end end end |
#all_exports ⇒ Object
265 266 267 |
# File 'lib/rbbt/workflow.rb', line 265 def all_exports @all_exports ||= asynchronous_exports + synchronous_exports + exec_exports + stream_exports end |
#assign_dep_inputs(_inputs, options, all_d, task_info) ⇒ Object
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 |
# File 'lib/rbbt/workflow/accessor.rb', line 763 def assign_dep_inputs(_inputs, , all_d, task_info) .each{|i,v| next if i == :compute or i == "compute" case v when :compute compute = v when Symbol rec_dependency = all_d.flatten.select{|d| d.task_name.to_sym == v }.first if rec_dependency.nil? if _inputs.include? v _inputs[i] = _inputs.delete(v) else _inputs[i] = v unless _inputs.include? i end else = task_info[:input_options][i] || {} if [:stream] or true #rec_dependency.run(true).grace unless rec_dependency.done? or rec_dependency.running? _inputs[i] = rec_dependency else rec_dependency.abort if rec_dependency.streaming? and not rec_dependency.running? rec_dependency.clean if rec_dependency.error? or rec_dependency.aborted? if rec_dependency.streaming? and rec_dependency.running? _inputs[i] = rec_dependency.join.load else rec_dependency.run(true) rec_dependency.join _inputs[i] = rec_dependency.load end end end else _inputs[i] = v end } if _inputs end |
#dep(*dependency, &block) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/rbbt/workflow/definition.rb', line 40 def dep(*dependency, &block) @dependencies ||= [] if block_given? if dependency.any? wf, task_name, = dependency , task_name = task_name, nil if Hash === task_name , wf = wf, nil if Hash === wf task_name, wf = wf, self if task_name.nil? DependencyBlock.setup block, [wf, task_name, ] end @dependencies << block else if Module === dependency.first or (defined? WorkflowRESTClient and WorkflowRESTClient === dependency.first) or Hash === dependency.last dependency = ([self] + dependency) unless Module === dependency.first or (defined? WorkflowRESTClient and WorkflowRESTClient === dependency.first) @dependencies << dependency else @dependencies.concat dependency end end end |
#desc(description) ⇒ Object
28 29 30 |
# File 'lib/rbbt/workflow/definition.rb', line 28 def desc(description) @description = description end |
#doc(task = nil) ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/rbbt/workflow/usage.rb', line 62 def doc(task = nil) if task.nil? puts Log.color :magenta, self.to_s puts Log.color :magenta, "=" * self.to_s.length if self.documentation[:title] and not self.documentation[:title].empty? puts puts Misc.format_paragraph self.documentation[:title] end if self.documentation[:description] and not self.documentation[:description].empty? puts puts Misc.format_paragraph self.documentation[:description] end puts puts Log.color :magenta, "## TASKS" if self.documentation[:task_description] and not self.documentation[:task_description].empty? puts puts Misc.format_paragraph self.documentation[:task_description] end puts tasks.each do |name,task| description = task.description || "" description = description.split("\n\n").first puts Misc.format_definition_list_item(name.to_s, description, 80, 30, :yellow) end else if Task === task task_name = task.name else task_name = task task = self.tasks[task_name] end #dependencies = self.rec_dependencies(task_name).collect{|dep_name| Array === dep_name ? dep_name.first.tasks[dep_name[1].to_sym] : self.tasks[dep_name.to_sym]} task.doc(self, self.rec_dependencies(task_name)) if self.examples.include? task_name self.examples[task_name].each do |example| puts Log.color(:magenta, "Example ") << Log.color(:green, example) + " -- " + Log.color(:blue, example_dir[task_name][example]) inputs = self.example(task_name, example) inputs.each do |input, type, file| case type when :tsv, :array, :text lines = file.read.split("\n") head = lines[0..5].compact * "\n\n" head = head[0..500] puts Misc.format_definition_list_item(input, head, 1000, -1, :blue).gsub(/\n\s*\n/,"\n") puts '...' if lines.length > 6 else puts Misc.format_definition_list_item(input, file.read, 80, 20, :blue) end end puts end end end end |
#documentation_markdown ⇒ Object
37 38 39 40 41 42 43 44 |
# File 'lib/rbbt/workflow/doc.rb', line 37 def documentation_markdown file = @libdir['workflow.md'].find if file.exists? file.read else "" end end |
#example(task_name, example) ⇒ Object
21 22 23 24 25 26 |
# File 'lib/rbbt/workflow/examples.rb', line 21 def example(task_name, example) task_info(task_name.to_sym)[:input_types].collect do |input,type| next unless example_dir[task_name][example][input].exists? [input, type, example_dir[task_name][example][input].find] end.compact end |
#example_inputs(task_name, example) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/rbbt/workflow/examples.rb', line 52 def example_inputs(task_name, example) inputs = {} IndiferentHash.setup(inputs) example(task_name, example).each do |input,type,file| case type when :tsv, :array, :text Log.debug "Pointing #{ input } to #{file}" inputs[input.to_sym] = file when :boolean inputs[input.to_sym] = (file.read.strip == 'true') else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read.strip end end inputs end |
#example_step(task_name, example, new_inputs = {}) ⇒ Object
72 73 74 75 76 77 78 79 80 81 |
# File 'lib/rbbt/workflow/examples.rb', line 72 def example_step(task_name, example, new_inputs = {}) inputs = example_inputs(task_name, example) if new_inputs and new_inputs.any? IndiferentHash.setup(new_inputs) inputs = inputs.merge(new_inputs) end self.job(task_name, example, inputs) end |
#examples ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/examples.rb', line 8 def examples return {} unless self.libdir.examples.exists? examples = {} example_dir.glob("*/*").each do |example_dir| example = File.basename(example_dir) task_name = File.basename(File.dirname(example_dir)) examples[task_name] ||= [] examples[task_name] << example end IndiferentHash.setup examples examples end |
#export_asynchronous(*names) ⇒ Object Also known as: export
123 124 125 126 127 128 |
# File 'lib/rbbt/workflow/definition.rb', line 123 def export_asynchronous(*names) unexport *names asynchronous_exports.concat names asynchronous_exports.uniq! asynchronous_exports end |
#export_exec(*names) ⇒ Object
109 110 111 112 113 114 |
# File 'lib/rbbt/workflow/definition.rb', line 109 def export_exec(*names) unexport *names exec_exports.concat names exec_exports.uniq! exec_exports end |
#export_stream(*names) ⇒ Object
130 131 132 133 134 135 |
# File 'lib/rbbt/workflow/definition.rb', line 130 def export_stream(*names) unexport *names stream_exports.concat names stream_exports.uniq! stream_exports end |
#export_synchronous(*names) ⇒ Object
116 117 118 119 120 121 |
# File 'lib/rbbt/workflow/definition.rb', line 116 def export_synchronous(*names) unexport *names synchronous_exports.concat names synchronous_exports.uniq! synchronous_exports end |
#extension(extension) ⇒ Object
32 33 34 |
# File 'lib/rbbt/workflow/definition.rb', line 32 def extension(extension) @extension = extension end |
#get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) ⇒ Object
{{{ LOAD FROM FILE
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
# File 'lib/rbbt/workflow.rb', line 369 def get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) step_path = step_path.call if Proc === step_path persist = input_values.nil? ? false : true persist = false key = Path === step_path ? step_path.find : step_path step = Step.new step_path, task, input_values, dependencies set_step_dependencies(step) unless dependencies step.extend step_module step.task ||= task step.inputs ||= input_values step.dependencies = dependencies if dependencies and (step.dependencies.nil? or step.dependencies.length < dependencies.length) step end |
#helper(name, &block) ⇒ Object
24 25 26 |
# File 'lib/rbbt/workflow/definition.rb', line 24 def helper(name, &block) helpers[name] = block end |
#id_for(path) ⇒ Object
913 914 915 916 917 918 919 920 |
# File 'lib/rbbt/workflow/accessor.rb', line 913 def id_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end Misc.path_relative_to workdir_find, path end |
#job(taskname, jobname = nil, inputs = {}) ⇒ Object
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/rbbt/workflow.rb', line 298 def job(taskname, jobname = nil, inputs = {}) taskname = taskname.to_sym return remote_tasks[taskname].job(taskname, jobname, inputs) if remote_tasks and remote_tasks.include? taskname jobname = DEFAULT_NAME if jobname.nil? or jobname.empty? task = tasks[taskname] raise "Task not found: #{ taskname }" if task.nil? inputs = IndiferentHash.setup(inputs) Workflow.resolve_locals(inputs) task_info = task_info(taskname) task_inputs = task_info[:inputs] defaults = IndiferentHash.setup(task_info[:input_defaults]).merge(task.input_defaults) missing_inputs = [] task.required_inputs.each do |input| missing_inputs << input if inputs[input].nil? end if missing_inputs.length == 1 raise ParameterException, "Input #{missing_inputs.first} is required but was not provided or is nil" end if missing_inputs.length > 1 raise ParameterException, "Inputs #{Misc.humanize_list(missing_inputs)} are required but were not provided or are nil" end dependencies = real_dependencies(task, jobname, defaults.merge(inputs), task_dependencies[taskname] || []) real_inputs = {} recursive_inputs = rec_inputs(taskname) inputs.each do |k,v| default = defaults[k] next unless (task_inputs.include?(k.to_sym) or task_inputs.include?(k.to_s)) next if default == v next if (String === default and Symbol === v and v.to_s == default) next if (Symbol === default and String === v and v == default.to_s) real_inputs[k] = v end if real_inputs.empty? and not Workflow::TAG == :inputs step_path = step_path taskname, jobname, [], [], task.extension input_values = task.take_input_values(inputs) else input_values = task.take_input_values(inputs) step_path = step_path taskname, jobname, input_values, dependencies, task.extension end job = get_job_step step_path, task, input_values, dependencies job.workflow = self job.clean_name = jobname job end |
#jobs(taskname, query = nil) ⇒ Object
}}} LOAD FROM FILE
417 418 419 420 421 422 423 424 425 |
# File 'lib/rbbt/workflow.rb', line 417 def jobs(taskname, query = nil) task_dir = File.join(File.(workdir.find), taskname.to_s) pattern = File.join(File.(task_dir), '**/*') job_info_files = Dir.glob(Step.info_file(pattern)).collect{|f| Misc.path_relative_to task_dir, f } job_info_files = job_info_files.select{|f| f.index(query) == 0 } if query job_info_files.collect{|f| job_name = Step.job_name_for_info_file(f, tasks[taskname].extension) } end |
#load_documentation ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/rbbt/workflow/doc.rb', line 46 def load_documentation @documentation = Workflow.parse_workflow_doc documentation_markdown @documentation[:tasks].each do |task, description| raise "Documentation for #{ task }, but not a #{ self.to_s } task" unless tasks.include? task.to_sym tasks[task.to_sym].description = description end end |
#load_id(id) ⇒ Object
389 390 391 392 393 394 395 396 397 |
# File 'lib/rbbt/workflow.rb', line 389 def load_id(id) path = File.join(workdir, id) task = task_for path return remote_tasks[task].load_id(id) if remote_tasks and remote_tasks.include? task step = Step.new path, tasks[task.to_sym] step.load_inputs_from_info set_step_dependencies(step) step end |
#load_name(task, name) ⇒ Object
408 409 410 411 412 413 |
# File 'lib/rbbt/workflow.rb', line 408 def load_name(task, name) return remote_tasks[task].load_step(path) if remote_tasks and remote_tasks.include? task task = tasks[task.to_sym] if String === task or Symbol === task path = step_path task.name, name, [], [], task.extension get_job_step path, task end |
#load_step(path) ⇒ Object
399 400 401 402 403 404 405 406 |
# File 'lib/rbbt/workflow.rb', line 399 def load_step(path) task = task_for path if task get_job_step path, tasks[task.to_sym] else get_job_step path end end |
#local_persist_setup ⇒ Object
{{{ Make workflow resources local
428 429 430 431 432 433 |
# File 'lib/rbbt/workflow.rb', line 428 def local_persist_setup class << self include LocalPersist end self.local_persist_dir = Rbbt.var.cache.persistence.find :lib end |
#local_workdir_setup ⇒ Object
435 436 437 |
# File 'lib/rbbt/workflow.rb', line 435 def local_workdir_setup self.workdir = Rbbt.var.jobs.find :lib end |
#log(status, message = nil, &block) ⇒ Object
539 540 541 |
# File 'lib/rbbt/workflow/accessor.rb', line 539 def log(status, = nil, &block) Step.log(status, , nil, &block) end |
#make_local ⇒ Object
439 440 441 442 |
# File 'lib/rbbt/workflow.rb', line 439 def make_local local_persist_setup local_workdir_setup end |
#real_dependencies(task, orig_jobname, inputs, dependencies) ⇒ Object
803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 |
# File 'lib/rbbt/workflow/accessor.rb', line 803 def real_dependencies(task, orig_jobname, inputs, dependencies) real_dependencies = [] path_deps = {} dependencies.each do |dependency| _inputs = IndiferentHash.setup(inputs.dup) jobname = orig_jobname jobname = _inputs[:jobname] if _inputs.include? :jobname real_dep = case dependency when Array workflow, dep_task, = dependency compute = [:compute] if all_d = (real_dependencies + real_dependencies.collect{|d| d.rec_dependencies} ).flatten.compact.uniq _inputs = assign_dep_inputs(_inputs, , all_d, workflow.task_info(dep_task)) jobname = _inputs[:jobname] if _inputs.include? :jobname job = workflow.job(dep_task, jobname, _inputs) ComputeDependency.setup(job, compute) if compute job when Step dependency when Symbol job(dependency, jobname, _inputs) when Proc if DependencyBlock === dependency orig_dep = dependency.dependency wf, task_name, = orig_dep = {} if .nil? compute = [:compute] = IndiferentHash.setup(.dup) dep = dependency.call jobname, .merge(_inputs), real_dependencies dep = [dep] unless Array === dep new_=[] dep.each{|d| next if d.nil? if Hash === d d[:workflow] ||= wf d[:task] ||= task_name task_info = d[:workflow].task_info(d[:task]) inputs = assign_dep_inputs({}, .merge(d[:inputs] || {}), real_dependencies, task_info) d = d[:workflow].job(d[:task], d[:jobname], inputs) end ComputeDependency.setup(d, compute) if compute new_ << d } dep = new_ else _inputs = IndiferentHash.setup(_inputs.dup) dep = dependency.call jobname, _inputs, real_dependencies if Hash === dep dep[:workflow] ||= wf || self task_info = (dep[:task] && dep[:workflow]) ? dep[:workflow].task_info(dep[:task]) : nil inputs = assign_dep_inputs({}, dep[:inputs], real_dependencies, task_info) dep = dep[:workflow].job(dep[:task], dep[:jobname], inputs) end end dep else raise "Dependency for #{task.name} not understood: #{Misc.fingerprint dependency}" end real_dependencies << real_dep end real_dependencies.flatten.compact end |
#rec_dependencies(taskname) ⇒ Object
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 |
# File 'lib/rbbt/workflow/accessor.rb', line 586 def rec_dependencies(taskname) @rec_dependencies ||= {} @rec_dependencies[taskname] ||= begin if task_dependencies.include? taskname deps = task_dependencies[taskname] #all_deps = deps.select{|dep| String === dep or Symbol === dep or Array === dep} all_deps = [] deps.each do |dep| if DependencyBlock === dep all_deps << dep.dependency if dep.dependency else all_deps << dep unless Proc === dep end begin case dep when Array wf, t, o = dep wf.rec_dependencies(t).each do |d| if Array === d new = d.dup else new = [dep.first, d] end if Hash === o and not o.empty? if Hash === new.last hash = new.last.dup o.each{|k,v| hash[k] ||= v} new[new.length-1] = hash else new.push o.dup end end all_deps << new end when String, Symbol rec_deps = rec_dependencies(dep.to_sym) all_deps.concat rec_deps when DependencyBlock dep = dep.dependency raise TryAgain end rescue TryAgain retry end end all_deps.uniq else [] end end end |
#rec_input_defaults(taskname) ⇒ Object
670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 |
# File 'lib/rbbt/workflow/accessor.rb', line 670 def rec_input_defaults(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject(IndiferentHash.setup({})){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_defaults elsif Symbol === tn new = tasks[tn.to_sym].input_defaults else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_descriptions(taskname) ⇒ Object
731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 |
# File 'lib/rbbt/workflow/accessor.rb', line 731 def rec_input_descriptions(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_descriptions elsif Symbol === tn new = tasks[tn.to_sym].input_descriptions else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_options(taskname) ⇒ Object
747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 |
# File 'lib/rbbt/workflow/accessor.rb', line 747 def (taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym]. elsif Symbol === tn new = tasks[tn.to_sym]. else next acc end acc = new.merge(acc) acc = acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_types(taskname) ⇒ Object
686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 |
# File 'lib/rbbt/workflow/accessor.rb', line 686 def rec_input_types(taskname) rec_inputs = rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| if Array === tn and tn[0] and tn[1] new = tn.first.tasks[tn[1].to_sym].input_types elsif Symbol === tn new = tasks[tn.to_sym].input_types else next acc end acc = new.merge(acc) acc.delete_if{|input,defaults| not rec_inputs.include? input} acc }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_use(taskname) ⇒ Object
702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 |
# File 'lib/rbbt/workflow/accessor.rb', line 702 def rec_input_use(taskname) task = task_from_dep(taskname) deps = rec_dependencies(taskname) inputs = {} task.inputs.each do |input| name = task.name workflow = (task.workflow || self).to_s inputs[input] ||= {} inputs[input][workflow] ||= [] inputs[input][workflow] << name end dep_inputs = Task.dep_inputs deps, self dep_inputs.each do |dep,is| name = dep.name workflow = dep.workflow is.each do |input| inputs[input] ||= {} inputs[input][workflow] ||= [] inputs[input][workflow] << name end end inputs end |
#rec_inputs(taskname) ⇒ Object
def rec_inputs(taskname)
[taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat(task_from_dep(tn).inputs) }.uniq
end
663 664 665 666 667 668 |
# File 'lib/rbbt/workflow/accessor.rb', line 663 def rec_inputs(taskname) task = task_from_dep(taskname) deps = rec_dependencies(taskname) dep_inputs = task.dep_inputs deps, self task.inputs + dep_inputs.values.flatten end |
#returns(description) ⇒ Object
36 37 38 |
# File 'lib/rbbt/workflow/definition.rb', line 36 def returns(description) @result_description = description end |
#set_step_dependencies(step) ⇒ Object
356 357 358 359 360 361 362 363 364 365 |
# File 'lib/rbbt/workflow.rb', line 356 def set_step_dependencies(step) if step.info.include? :dependencies Misc.insist do step.dependencies = step.info[:dependencies].collect do |task, job, path| next if job.nil? load_step(path) end end end end |
#step_module ⇒ Object
285 286 287 288 289 290 291 292 293 294 295 296 |
# File 'lib/rbbt/workflow.rb', line 285 def step_module @_m ||= begin m = Module.new helpers.each do |name,block| m.send(:define_method, name, &block) end m end @_m end |
#step_path(taskname, jobname, inputs, dependencies, extension = nil) ⇒ Object
879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 |
# File 'lib/rbbt/workflow/accessor.rb', line 879 def step_path(taskname, jobname, inputs, dependencies, extension = nil) raise "Jobname makes an invalid path: #{ jobname }" if jobname =~ /\.\./ if inputs.length > 0 or dependencies.any? tagged_jobname = case TAG when :hash hash_str = Misc.obj2digest({:inputs => inputs, :dependencies => dependencies}) jobname + '_' << hash_str when :inputs all_inputs = {} inputs.zip(self.task_info(taskname)[:inputs]) do |i,f| all_inputs[f] = i end dependencies.each do |dep| ri = dep.recursive_inputs ri.zip(ri.fields).each do |i,f| all_inputs[f] = i end end all_inputs.any? ? jobname + '_' << Misc.obj2str(all_inputs) : jobname else jobname end else tagged_jobname = jobname end if extension and not extension.empty? tagged_jobname = tagged_jobname + ('.' << extension.to_s) end workdir[taskname][tagged_jobname].find end |
#task(name, &block) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/rbbt/workflow/definition.rb', line 66 def task(name, &block) if Hash === name type = name.first.last name = name.first.first else result_type = consume_result_type || :marshal end name = name.to_sym block = self.method(name) unless block_given? task_info = { :name => name, :inputs => consume_inputs, :description => consume_description, :input_types => consume_input_types, :result_type => (String === type ? type.to_sym : type), :result_description => consume_result_description, :input_defaults => consume_input_defaults, :input_descriptions => consume_input_descriptions, :required_inputs => consume_required_inputs, :extension => consume_extension, :input_options => } task = Task.setup(task_info, &block) last_task = task tasks[name] = task task_dependencies[name] = consume_dependencies end |
#task_exports ⇒ Object
939 940 941 |
# File 'lib/rbbt/workflow/accessor.rb', line 939 def task_exports [exec_exports, synchronous_exports, asynchronous_exports, stream_exports].compact.flatten.uniq end |
#task_for(path) ⇒ Object
922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 |
# File 'lib/rbbt/workflow/accessor.rb', line 922 def task_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end workdir_find = File.(workdir_find) path = File.(path) dir = File.dirname(path) begin Misc.path_relative_to(workdir_find, dir).sub(/([^\/]+)\/.*/,'\1') rescue nil end end |
#task_from_dep(dep) ⇒ Object
646 647 648 649 650 651 652 653 654 655 656 657 |
# File 'lib/rbbt/workflow/accessor.rb', line 646 def task_from_dep(dep) task = case dep when Array dep.first.tasks[dep[1]] when String tasks[dep.to_sym] when Symbol tasks[dep.to_sym] end raise "Unknown dependency: #{Misc.fingerprint dep}" if task.nil? task end |
#task_info(name) ⇒ Object
543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
# File 'lib/rbbt/workflow/accessor.rb', line 543 def task_info(name) name = name.to_sym task = tasks[name] raise "No '#{name}' task in '#{self.to_s}' Workflow" if task.nil? description = task.description result_description = task.result_description result_type = task.result_type inputs = rec_inputs(name).uniq input_types = rec_input_types(name) input_descriptions = rec_input_descriptions(name) input_use = rec_input_use(name) input_defaults = rec_input_defaults(name) = (name) export = case when (synchronous_exports.include?(name.to_sym) or synchronous_exports.include?(name.to_s)) :synchronous when (asynchronous_exports.include?(name.to_sym) or asynchronous_exports.include?(name.to_s)) :asynchronous when (exec_exports.include?(name.to_sym) or exec_exports.include?(name.to_s)) :exec when (stream_exports.include?(name.to_sym) or stream_exports.include?(name.to_s)) :stream else :none end dependencies = task_dependencies[name].select{|dep| String === dep or Symbol === dep} { :id => File.join(self.to_s, name.to_s), :description => description, :export => export, :inputs => inputs, :input_types => input_types, :input_descriptions => input_descriptions, :input_defaults => input_defaults, :input_options => , :input_use => input_use, :result_type => result_type, :result_description => result_description, :dependencies => dependencies } end |
#unexport(*names) ⇒ Object
100 101 102 103 104 105 106 107 |
# File 'lib/rbbt/workflow/definition.rb', line 100 def unexport(*names) names = names.collect{|n| n.to_s} + names.collect{|n| n.to_sym} names.uniq! exec_exports.replace exec_exports - names if exec_exports synchronous_exports.replace synchronous_exports - names if synchronous_exports asynchronous_exports.replace asynchronous_exports - names if asynchronous_exports stream_exports.replace stream_exports - names if stream_exports end |