Module: Workflow
- Includes:
- AnnotatedModule
- Included in:
- WorkflowRESTClient
- Defined in:
- lib/rbbt/workflow.rb,
lib/rbbt/workflow/doc.rb,
lib/rbbt/workflow/usage.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/examples.rb,
lib/rbbt/workflow/definition.rb
Defined Under Namespace
Modules: DependencyBlock Classes: TaskNotFoundException
Constant Summary collapse
- STEP_CACHE =
{}
- DEFAULT_NAME =
{{{ JOB MANAGEMENT
"Default"
- TAG =
:hash
Class Attribute Summary collapse
-
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
-
.workflow_dir ⇒ Object
Returns the value of attribute workflow_dir.
-
.workflows ⇒ Object
Returns the value of attribute workflows.
Instance Attribute Summary collapse
-
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
-
#description ⇒ Object
Returns the value of attribute description.
-
#documentation ⇒ Object
Returns the value of attribute documentation.
-
#example_dir ⇒ Object
Returns the value of attribute example_dir.
-
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
-
#helpers ⇒ Object
Returns the value of attribute helpers.
-
#last_task ⇒ Object
Returns the value of attribute last_task.
-
#libdir ⇒ Object
Returns the value of attribute libdir.
-
#step_cache ⇒ Object
Returns the value of attribute step_cache.
-
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
-
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
-
#task_description ⇒ Object
Returns the value of attribute task_description.
-
#tasks ⇒ Object
Returns the value of attribute tasks.
-
#workdir ⇒ Object
Returns the value of attribute workdir.
Class Method Summary collapse
- .doc_parse_chunks(str, pattern) ⇒ Object
- .doc_parse_first_line(str) ⇒ Object
- .doc_parse_up_to(str, pattern, keep = false) ⇒ Object
- .extended(base) ⇒ Object
- .installed_workflows ⇒ Object
- .load_inputs(dir, input_names, input_types) ⇒ Object
- .load_workflow_file(filename) ⇒ Object
- .parse_workflow_doc(doc) ⇒ Object
- .require_local_workflow(wf_name) ⇒ Object
- .require_remote_workflow(wf_name, url) ⇒ Object
- .require_workflow(wf_name) ⇒ Object
- .resolve_locals(inputs) ⇒ Object
Instance Method Summary collapse
- #all_exports ⇒ Object
- #dep(*dependency, &block) ⇒ Object
- #desc(description) ⇒ Object
- #doc(task = nil) ⇒ Object
- #documentation_markdown ⇒ Object
- #example(task_name, example) ⇒ Object
- #example_step(task_name, example) ⇒ Object
- #examples ⇒ Object
- #export_asynchronous(*names) ⇒ Object
- #export_exec(*names) ⇒ Object
- #export_synchronous(*names) ⇒ Object
- #extension(extension) ⇒ Object
- #get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) ⇒ Object
- #helper(name, &block) ⇒ Object
- #id_for(path) ⇒ Object
- #job(taskname, jobname = nil, inputs = {}) ⇒ Object
- #jobs(taskname, query = nil) ⇒ Object
- #load_documentation ⇒ Object
- #load_id(id) ⇒ Object
- #load_name(task, name) ⇒ Object
- #load_step(path) ⇒ Object
-
#local_persist_setup ⇒ Object
{{{ Make workflow resources local.
- #local_workdir_setup ⇒ Object
- #log(status, message = nil, &block) ⇒ Object
- #make_local ⇒ Object
- #real_dependencies(task, jobname, inputs, dependencies) ⇒ Object
- #rec_dependencies(taskname) ⇒ Object
- #rec_input_defaults(taskname) ⇒ Object
- #rec_input_descriptions(taskname) ⇒ Object
- #rec_input_options(taskname) ⇒ Object
- #rec_input_types(taskname) ⇒ Object
-
#rec_inputs(taskname) ⇒ Object
def rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat(task_from_dep(tn).inputs) }.uniq end.
- #returns(description) ⇒ Object
- #step_module ⇒ Object
- #step_path(taskname, jobname, inputs, dependencies, extension = nil) ⇒ Object
- #task(name, &block) ⇒ Object
- #task_for(path) ⇒ Object
- #task_from_dep(dep) ⇒ Object
- #task_info(name) ⇒ Object
- #with_workdir(workdir) ⇒ Object
Methods included from AnnotatedModule
add_consummable_annotation, #input
Class Attribute Details
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
24 25 26 |
# File 'lib/rbbt/workflow.rb', line 24 def autoinstall @autoinstall end |
.workflow_dir ⇒ Object
Returns the value of attribute workflow_dir.
24 25 26 |
# File 'lib/rbbt/workflow.rb', line 24 def workflow_dir @workflow_dir end |
.workflows ⇒ Object
Returns the value of attribute workflows.
24 25 26 |
# File 'lib/rbbt/workflow.rb', line 24 def workflows @workflows end |
Instance Attribute Details
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
169 170 171 |
# File 'lib/rbbt/workflow.rb', line 169 def asynchronous_exports @asynchronous_exports end |
#description ⇒ Object
Returns the value of attribute description.
165 166 167 |
# File 'lib/rbbt/workflow.rb', line 165 def description @description end |
#documentation ⇒ Object
Returns the value of attribute documentation.
54 55 56 |
# File 'lib/rbbt/workflow/doc.rb', line 54 def documentation @documentation end |
#example_dir ⇒ Object
Returns the value of attribute example_dir.
2 3 4 |
# File 'lib/rbbt/workflow/examples.rb', line 2 def example_dir @example_dir end |
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
169 170 171 |
# File 'lib/rbbt/workflow.rb', line 169 def exec_exports @exec_exports end |
#helpers ⇒ Object
Returns the value of attribute helpers.
167 168 169 |
# File 'lib/rbbt/workflow.rb', line 167 def helpers @helpers end |
#last_task ⇒ Object
Returns the value of attribute last_task.
168 169 170 |
# File 'lib/rbbt/workflow.rb', line 168 def last_task @last_task end |
#libdir ⇒ Object
Returns the value of attribute libdir.
166 167 168 |
# File 'lib/rbbt/workflow.rb', line 166 def libdir @libdir end |
#step_cache ⇒ Object
Returns the value of attribute step_cache.
170 171 172 |
# File 'lib/rbbt/workflow.rb', line 170 def step_cache @step_cache end |
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
169 170 171 |
# File 'lib/rbbt/workflow.rb', line 169 def synchronous_exports @synchronous_exports end |
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
168 169 170 |
# File 'lib/rbbt/workflow.rb', line 168 def task_dependencies @task_dependencies end |
#task_description ⇒ Object
Returns the value of attribute task_description.
168 169 170 |
# File 'lib/rbbt/workflow.rb', line 168 def task_description @task_description end |
#tasks ⇒ Object
Returns the value of attribute tasks.
167 168 169 |
# File 'lib/rbbt/workflow.rb', line 167 def tasks @tasks end |
#workdir ⇒ Object
Returns the value of attribute workdir.
166 167 168 |
# File 'lib/rbbt/workflow.rb', line 166 def workdir @workdir end |
Class Method Details
.doc_parse_chunks(str, pattern) ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/rbbt/workflow/doc.rb', line 21 def self.doc_parse_chunks(str, pattern) parts = str.split(pattern) return {} if parts.length < 2 tasks = Hash[*parts[1..-1].collect{|v| v.strip}] tasks.delete_if{|t,d| d.empty?} tasks end |
.doc_parse_first_line(str) ⇒ Object
3 4 5 6 7 8 9 10 |
# File 'lib/rbbt/workflow/doc.rb', line 3 def self.doc_parse_first_line(str) if str.match(/^([^\n]*)\n\n(.*)/sm) str.replace $2 $1 else "" end end |
.doc_parse_up_to(str, pattern, keep = false) ⇒ Object
12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/doc.rb', line 12 def self.doc_parse_up_to(str, pattern, keep = false) pre, _pat, _post = str.partition pattern if _pat [pre, (keep ? _pat << _post : _post)] else _post end end |
.extended(base) ⇒ Object
32 33 34 35 |
# File 'lib/rbbt/workflow.rb', line 32 def self.extended(base) self.workflows << base base.libdir = Path.setup(Path.caller_lib_dir).tap{|p| p.resource = base} end |
.installed_workflows ⇒ Object
62 63 64 65 66 |
# File 'lib/rbbt/workflow.rb', line 62 def self.installed_workflows self.workflow_dir.glob('**/workflow.rb').collect do |file| File.basename(File.dirname(file)) end end |
.load_inputs(dir, input_names, input_types) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/rbbt/workflow/examples.rb', line 28 def self.load_inputs(dir, input_names, input_types) inputs = {} dir = Path.setup(dir.dup) input_names.each do |input| file = dir[input].find Log.debug "Trying #{ input }: #{file}" next unless file.exists? case input_types[input] when :tsv, :array, :text Log.debug "Pointing #{ input } to #{file}" inputs[input.to_sym] = file when :boolean inputs[input.to_sym] = (file.read.strip == 'true') else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read.strip end end IndiferentHash.setup(inputs) end |
.load_workflow_file(filename) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/rbbt/workflow.rb', line 42 def self.load_workflow_file(filename) begin workflow_lib_dir = File.join(File.dirname(File.(filename)), 'lib') #$LOAD_PATH.unshift(File.join(File.dirname(File.expand_path(filename)), 'lib')) if File.directory? workflow_lib_dir Log.debug "Adding workflow lib directory to LOAD_PATH: #{workflow_lib_dir}" $LOAD_PATH.unshift(workflow_lib_dir) end filename = File.(filename) require filename Log.debug{"Workflow loaded from: #{ filename }"} return true rescue Exception Log.warn{"Error loading workflow: #{ filename }"} raise $! end end |
.parse_workflow_doc(doc) ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/rbbt/workflow/doc.rb', line 29 def self.parse_workflow_doc(doc) title = doc_parse_first_line doc description, task_info = doc_parse_up_to doc, /^# Tasks/i task_description, tasks = doc_parse_up_to task_info, /^##/, true tasks = doc_parse_chunks tasks, /## (.*)/ {:title => title.strip, :description => description.strip, :task_description => task_description.strip, :tasks => tasks} end |
.require_local_workflow(wf_name) ⇒ Object
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/rbbt/workflow.rb', line 83 def self.require_local_workflow(wf_name) filename = nil if Path === wf_name case # Points to workflow file when ((File.exists?(wf_name.find) and not File.directory?(wf_name.find)) or File.exists?(wf_name.find + '.rb')) filename = wf_name.find # Points to workflow dir when (File.exists?(wf_name.find) and File.directory?(wf_name.find) and File.exists?(File.join(wf_name.find, 'workflow.rb'))) filename = wf_name['workflow.rb'].find end else if ((File.exists?(wf_name) and not File.directory?(wf_name)) or File.exists?(wf_name + '.rb')) filename = (wf_name =~ /\.?\//) ? wf_name : "./" << wf_name else filename = workflow_dir[wf_name]['workflow.rb'].find end end if filename and File.exists? filename load_workflow_file filename else return false end end |
.require_remote_workflow(wf_name, url) ⇒ Object
37 38 39 40 |
# File 'lib/rbbt/workflow.rb', line 37 def self.require_remote_workflow(wf_name, url) require 'rbbt/rest/client' eval "Object::#{wf_name} = WorkflowRESTClient.new '#{ url }', '#{wf_name}'" end |
.require_workflow(wf_name) ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/rbbt/workflow.rb', line 112 def self.require_workflow(wf_name) # Already loaded begin workflow = Misc.string2const wf_name Log.debug{"Workflow #{ wf_name } already loaded"} return workflow rescue Exception end # Load remotely if Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml if Hash === remote_workflows and remote_workflows.include?(wf_name) url = remote_workflows[wf_name] begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end end if Open.remote? wf_name url = wf_name wf_name = File.basename(url) begin return require_remote_workflow(wf_name, url) ensure Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} end end # Load locally if wf_name =~ /::\w+$/ clean_name = wf_name.sub(/::.*/,'') Log.info{"Looking for '#{wf_name}' in '#{clean_name}'"} require_workflow clean_name return Misc.string2const Misc.camel_case(wf_name) end Log.info{"Loading workflow #{wf_name}"} require_local_workflow(wf_name) or require_local_workflow(Misc.snake_case(wf_name)) or (Workflow.autoinstall and `rbbt workflow install #{Misc.snake_case(wf_name)}` and require_local_workflow(Misc.snake_case(wf_name))) or raise("Workflow not found or could not be loaded: #{ wf_name }") begin Misc.string2const Misc.camel_case(wf_name) rescue Workflow.workflows.last || true end end |
.resolve_locals(inputs) ⇒ Object
232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/rbbt/workflow.rb', line 232 def self.resolve_locals(inputs) inputs.each do |name, value| if value =~ /^local:(.*?):(.*)/ or (Array === value and value.length == 1 and value.first =~ /^local:(.*?):(.*)/) or (TSV === value and value.size == 1 and value.keys.first =~ /^local:(.*?):(.*)/) task_name = $1 jobname = $2 value = load_id(File.join(task_name, jobname)).load inputs[name] = value end end end |
Instance Method Details
#all_exports ⇒ Object
225 226 227 |
# File 'lib/rbbt/workflow.rb', line 225 def all_exports @all_exports ||= asynchronous_exports + synchronous_exports + exec_exports end |
#dep(*dependency, &block) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/rbbt/workflow/definition.rb', line 37 def dep(*dependency_list, &block) @dependencies ||= [] if Module === dependency_list.first or Hash === dependency_list.last @dependencies << dependency_list else @dependency_list ||= [] if block_given? dependency_list.unshift self if dependency_list.length == 1 dependency_list << block end dependencies.concat dependency_list end end |
#desc(description) ⇒ Object
25 26 27 |
# File 'lib/rbbt/workflow/definition.rb', line 25 def desc(description) @description = description end |
#doc(task = nil) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/rbbt/workflow/usage.rb', line 46 def doc(task = nil) if task.nil? puts Log.color :magenta, self.to_s puts Log.color :magenta, "=" * self.to_s.length if self.documentation[:description] and not self.documentation[:description].empty? puts puts Misc.format_paragraph self.documentation[:description] end puts puts Log.color :magenta, "## TASKS" if self.documentation[:task_description] and not self.documentation[:task_description].empty? puts puts Misc.format_paragraph self.documentation[:task_description] end puts tasks.each do |name,task| description = task.description || "" description = description.split("\n\n").first puts Misc.format_definition_list_item(name.to_s, description, 80, 30, :yellow) end else if Task === task task_name = task.name else task_name = task task = self.tasks[task_name] end #dependencies = self.rec_dependencies(task_name).collect{|dep_name| Array === dep_name ? dep_name.first.tasks[dep_name[1].to_sym] : self.tasks[dep_name.to_sym]} task.doc(self, self.rec_dependencies(task_name)) if self.examples.include? task_name self.examples[task_name].each do |example| puts Log.color(:magenta, "Example ") << Log.color(:green, example) + " -- " + Log.color(:blue, example_dir[task_name][example]) inputs = self.example(task_name, example) inputs.each do |input, type, file| case type when :tsv, :array, :text lines = file.read.split("\n") head = lines[0..5].compact * "\n\n" head = head[0..500] puts Misc.format_definition_list_item(input, head, 1000, -1, :blue).gsub(/\n\s*\n/,"\n") puts '...' if lines.length > 6 else puts Misc.format_definition_list_item(input, file.read, 80, 20, :blue) end end puts end end end end |
#documentation_markdown ⇒ Object
37 38 39 40 41 42 43 44 |
# File 'lib/rbbt/workflow/doc.rb', line 37 def documentation_markdown file = @libdir['workflow.md'].find if file.exists? file.read else "" end end |
#example(task_name, example) ⇒ Object
21 22 23 24 25 26 |
# File 'lib/rbbt/workflow/examples.rb', line 21 def example(task_name, example) task_info(task_name.to_sym)[:input_types].collect do |input,type| next unless example_dir[task_name][example][input].exists? [input, type, example_dir[task_name][example][input].find] end.compact end |
#example_step(task_name, example) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/rbbt/workflow/examples.rb', line 51 def example_step(task_name, example) inputs = {} example(task_name, example).each do |input,type,file| case type when :tsv, :array, :text Log.debug "Pointing #{ input } to #{file}" inputs[input.to_sym] = file when :boolean inputs[input.to_sym] = (file.read.strip == 'true') else Log.debug "Loading #{ input } from #{file}" inputs[input.to_sym] = file.read.strip end end self.job(task_name, example, inputs) end |
#examples ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/workflow/examples.rb', line 8 def examples return {} unless self.libdir.examples.exists? examples = {} example_dir.glob("*/*").each do |example_dir| example = File.basename(example_dir) task_name = File.basename(File.dirname(example_dir)) examples[task_name] ||= [] examples[task_name] << example end IndiferentHash.setup examples examples end |
#export_asynchronous(*names) ⇒ Object
104 105 106 107 108 |
# File 'lib/rbbt/workflow/definition.rb', line 104 def export_asynchronous(*names) asynchronous_exports.concat names asynchronous_exports.uniq! asynchronous_exports end |
#export_exec(*names) ⇒ Object
98 99 100 101 102 |
# File 'lib/rbbt/workflow/definition.rb', line 98 def export_exec(*names) exec_exports.concat names exec_exports.uniq! exec_exports end |
#export_synchronous(*names) ⇒ Object
110 111 112 113 114 |
# File 'lib/rbbt/workflow/definition.rb', line 110 def export_synchronous(*names) synchronous_exports.concat names synchronous_exports.uniq! synchronous_exports end |
#extension(extension) ⇒ Object
29 30 31 |
# File 'lib/rbbt/workflow/definition.rb', line 29 def extension(extension) @extension = extension end |
#get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) ⇒ Object
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 |
# File 'lib/rbbt/workflow.rb', line 258 def get_job_step(step_path, task = nil, input_values = nil, dependencies = nil) step_path = step_path.call if Proc === step_path persist = input_values.nil? ? false : true persist = false key = Path === step_path ? step_path.find : step_path step = Step.new step_path, task, input_values, dependencies step.extend step_module step.task ||= task step.inputs ||= input_values step.dependencies = dependencies if dependencies and (step.dependencies.nil? or step.dependencies.length < dependencies.length) step end |
#helper(name, &block) ⇒ Object
21 22 23 |
# File 'lib/rbbt/workflow/definition.rb', line 21 def helper(name, &block) helpers[name] = block end |
#id_for(path) ⇒ Object
596 597 598 599 600 601 602 603 |
# File 'lib/rbbt/workflow/accessor.rb', line 596 def id_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end Misc.path_relative_to workdir_find, path end |
#job(taskname, jobname = nil, inputs = {}) ⇒ Object
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/rbbt/workflow.rb', line 275 def job(taskname, jobname = nil, inputs = {}) taskname = taskname.to_sym jobname = DEFAULT_NAME if jobname.nil? or jobname.empty? task = tasks[taskname] raise "Task not found: #{ taskname }" if task.nil? inputs = IndiferentHash.setup(inputs) Workflow.resolve_locals(inputs) task_inputs = task_info(taskname)[:inputs] defaults = IndiferentHash.setup(task.input_defaults) dependencies = real_dependencies(task, jobname, defaults.merge(inputs), task_dependencies[taskname] || []) real_inputs = {} inputs.each do |k,v| default = defaults[k] if (task_inputs.include?(k.to_sym) or task_inputs.include?(k.to_s)) and (defaults[k].to_s != v.to_s and not (FalseClass === v and defaults[k].nil?)) real_inputs[k] = v end end if real_inputs.empty? step_path = step_path taskname, jobname, [], [], task.extension input_values = task.take_input_values(inputs) else input_values = task.take_input_values(inputs) step_path = step_path taskname, jobname, input_values, dependencies, task.extension end job = get_job_step step_path, task, input_values, dependencies job.clean_name = jobname job end |
#jobs(taskname, query = nil) ⇒ Object
338 339 340 341 342 343 344 345 346 |
# File 'lib/rbbt/workflow.rb', line 338 def jobs(taskname, query = nil) task_dir = File.join(File.(workdir.find), taskname.to_s) pattern = File.join(File.(task_dir), '**/*') job_info_files = Dir.glob(Step.info_file(pattern)).collect{|f| Misc.path_relative_to task_dir, f } job_info_files = job_info_files.select{|f| f.index(query) == 0 } if query job_info_files.collect{|f| job_name = Step.job_name_for_info_file(f, tasks[taskname].extension) } end |
#load_documentation ⇒ Object
46 47 48 49 50 51 52 |
# File 'lib/rbbt/workflow/doc.rb', line 46 def load_documentation @documentation = Workflow.parse_workflow_doc documentation_markdown @documentation[:tasks].each do |task, description| raise "Documentation for #{ task }, but not a #{ self.to_s } task" unless tasks.include? task.to_sym tasks[task.to_sym].description = description end end |
#load_id(id) ⇒ Object
319 320 321 322 323 324 325 326 327 328 329 330 |
# File 'lib/rbbt/workflow.rb', line 319 def load_id(id) path = File.join(workdir, id) task = task_for path step = Step.new path, tasks[task.to_sym] step.info if step.info.include? :dependencies step.dependencies = step.info[:dependencies].collect do |task, job| load_id(File.join(task.to_s, job)) end end step end |
#load_name(task, name) ⇒ Object
332 333 334 335 336 |
# File 'lib/rbbt/workflow.rb', line 332 def load_name(task, name) task = tasks[task.to_sym] if String === task or Symbol === task path = step_path task.name, name, [], [], task.extension get_job_step path, task end |
#load_step(path) ⇒ Object
314 315 316 317 |
# File 'lib/rbbt/workflow.rb', line 314 def load_step(path) task = task_for path get_job_step path, tasks[task.to_sym] end |
#local_persist_setup ⇒ Object
{{{ Make workflow resources local
349 350 351 352 353 354 |
# File 'lib/rbbt/workflow.rb', line 349 def local_persist_setup class << self include LocalPersist end self.local_persist_dir = Rbbt.var.cache.persistence.find :lib end |
#local_workdir_setup ⇒ Object
356 357 358 |
# File 'lib/rbbt/workflow.rb', line 356 def local_workdir_setup self.workdir = Rbbt.var.jobs.find :lib end |
#log(status, message = nil, &block) ⇒ Object
404 405 406 |
# File 'lib/rbbt/workflow/accessor.rb', line 404 def log(status, = nil, &block) Step.log(status, , nil, &block) end |
#make_local ⇒ Object
360 361 362 363 |
# File 'lib/rbbt/workflow.rb', line 360 def make_local local_persist_setup local_workdir_setup end |
#real_dependencies(task, jobname, inputs, dependencies) ⇒ Object
527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 |
# File 'lib/rbbt/workflow/accessor.rb', line 527 def real_dependencies(task, jobname, inputs, dependencies) real_dependencies = [] dependencies.each do |dependency| real_dependencies << case dependency when Array workflow, task, = dependency _inputs = IndiferentHash.setup(inputs.dup) .each{|i,v| case v when Symbol rec_dependency = (real_dependencies + real_dependencies.collect{|d| d.rec_dependencies}).flatten.compact.uniq.select{|d| d.task.name.to_sym == v }.first if rec_dependency.nil? _inputs[i] = v else = workflow.task_info(task)[:input_options][i] || {} if [:stream] #rec_dependency.run(true).grace unless rec_dependency.done? or rec_dependency.running? _inputs[i] = rec_dependency else _inputs[i] = rec_dependency.run end end else _inputs[i] = v end } if res = workflow.job(task, jobname, _inputs) res when Step dependency when Symbol _inputs = IndiferentHash.setup(inputs.dup) job(dependency, jobname, _inputs) when Proc _inputs = IndiferentHash.setup(inputs.dup) dependency.call jobname, _inputs, real_dependencies end end real_dependencies.flatten.compact end |
#rec_dependencies(taskname) ⇒ Object
447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 |
# File 'lib/rbbt/workflow/accessor.rb', line 447 def rec_dependencies(taskname) @rec_dependencies ||= {} @rec_dependencies[taskname] ||= begin if task_dependencies.include? taskname deps = task_dependencies[taskname] all_deps = deps.select{|dep| String === dep or Symbol === dep or Array === dep} deps.each do |dep| case dep when Array dep.first.rec_dependencies(dep.last).each do |d| if Array === d all_deps << d else all_deps << [dep.first, d] end end when String, Symbol all_deps.concat rec_dependencies(dep.to_sym) when DependencyBlock all_deps << dep.dependency end end all_deps.uniq else [] end end end |
#rec_input_defaults(taskname) ⇒ Object
499 500 501 502 503 504 |
# File 'lib/rbbt/workflow/accessor.rb', line 499 def rec_input_defaults(taskname) [taskname].concat(rec_dependencies(taskname)).inject(IndiferentHash.setup({})){|acc, tn| new = (Array === tn ? tn.first.tasks[tn[1].to_sym] : tasks[tn.to_sym]).input_defaults acc = new.merge(acc) }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_descriptions(taskname) ⇒ Object
513 514 515 516 517 518 |
# File 'lib/rbbt/workflow/accessor.rb', line 513 def rec_input_descriptions(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| new = (Array === tn ? tn.first.tasks[tn[1].to_sym] : tasks[tn.to_sym]).input_descriptions acc = new.merge(acc) }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_options(taskname) ⇒ Object
520 521 522 523 524 525 |
# File 'lib/rbbt/workflow/accessor.rb', line 520 def (taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| new = (Array === tn ? tn.first.tasks[tn[1].to_sym] : tasks[tn.to_sym]). acc = new.merge(acc) }.tap{|h| IndiferentHash.setup(h)} end |
#rec_input_types(taskname) ⇒ Object
506 507 508 509 510 511 |
# File 'lib/rbbt/workflow/accessor.rb', line 506 def rec_input_types(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| new = (Array === tn ? tn.first.tasks[tn[1].to_sym] : tasks[tn.to_sym]).input_types acc = new.merge(acc) }.tap{|h| IndiferentHash.setup(h)} end |
#rec_inputs(taskname) ⇒ Object
def rec_inputs(taskname)
[taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat(task_from_dep(tn).inputs) }.uniq
end
493 494 495 496 497 |
# File 'lib/rbbt/workflow/accessor.rb', line 493 def rec_inputs(taskname) task = task_from_dep(taskname) dep_inputs = task.dep_inputs rec_dependencies(taskname), self task.inputs + dep_inputs.values.flatten end |
#returns(description) ⇒ Object
33 34 35 |
# File 'lib/rbbt/workflow/definition.rb', line 33 def returns(description) @result_description = description end |
#step_module ⇒ Object
245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/rbbt/workflow.rb', line 245 def step_module @_m ||= begin m = Module.new helpers.each do |name,block| m.send(:define_method, name, &block) end m end @_m end |
#step_path(taskname, jobname, inputs, dependencies, extension = nil) ⇒ Object
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 |
# File 'lib/rbbt/workflow/accessor.rb', line 571 def step_path(taskname, jobname, inputs, dependencies, extension = nil) #Proc.new{ raise "Jobname makes an invalid path: #{ jobname }" if jobname =~ /\.\./ if inputs.any? or dependencies.any? tagged_jobname = case TAG when :hash input_str = "" input_str << inputs.collect{|i| Misc.fingerprint(i) } * "," input_str << ";" << dependencies.collect{|dep| dep.name } * "\n" jobname + '_' << Misc.digest(input_str) else jobname end else tagged_jobname = jobname end if extension and not extension.empty? tagged_jobname = tagged_jobname + ('.' << extension.to_s) end workdir[taskname][tagged_jobname].find #} end |
#task(name, &block) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/rbbt/workflow/definition.rb', line 66 def task(name, &block) if Hash === name type = name.first.last name = name.first.first else result_type = consume_result_type || :marshal end name = name.to_sym block = self.method(name) unless block_given? task_info = { :name => name, :inputs => consume_inputs, :description => consume_description, :input_types => consume_input_types, :result_type => (Array === type ? type.to_sym : type), :input_defaults => consume_input_defaults, :input_descriptions => consume_input_descriptions, :extension => consume_extension, :input_options => } task = Task.setup(task_info, &block) last_task = task tasks[name] = task task_dependencies[name] = consume_dependencies end |
#task_for(path) ⇒ Object
605 606 607 608 609 610 611 612 613 614 615 616 |
# File 'lib/rbbt/workflow/accessor.rb', line 605 def task_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end workdir_find = File.(workdir_find) path = File.(path) dir = File.dirname(path) Misc.path_relative_to(workdir_find, dir).sub(/([^\/]+)\/.*/,'\1') end |
#task_from_dep(dep) ⇒ Object
476 477 478 479 480 481 482 483 484 485 486 487 |
# File 'lib/rbbt/workflow/accessor.rb', line 476 def task_from_dep(dep) task = case dep when Array dep.first.tasks[dep[1]] when String tasks[dep.to_sym] when Symbol tasks[dep.to_sym] end raise "Unknown dependency: #{Misc.fingerprint dep}" if task.nil? task end |
#task_info(name) ⇒ Object
408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 |
# File 'lib/rbbt/workflow/accessor.rb', line 408 def task_info(name) name = name.to_sym task = tasks[name] raise "No '#{name}' task in '#{self.to_s}' Workflow" if task.nil? description = task.description result_description = task.result_description result_type = task.result_type inputs = rec_inputs(name).uniq input_types = rec_input_types(name) input_descriptions = rec_input_descriptions(name) input_defaults = rec_input_defaults(name) = (name) export = case when (synchronous_exports.include?(name.to_sym) or synchronous_exports.include?(name.to_s)) :synchronous when (asynchronous_exports.include?(name.to_sym) or asynchronous_exports.include?(name.to_s)) :asynchronous when (exec_exports.include?(name.to_sym) or exec_exports.include?(name.to_s)) :exec else :none end dependencies = task_dependencies[name].select{|dep| String === dep or Symbol === dep} { :id => File.join(self.to_s, name.to_s), :description => description, :export => export, :inputs => inputs, :input_types => input_types, :input_descriptions => input_descriptions, :input_defaults => input_defaults, :input_options => , :result_type => result_type, :result_description => result_description, :dependencies => dependencies } end |