Module: Workflow
- Includes:
- AnnotatedModule
- Defined in:
- lib/rbbt/workflow.rb,
lib/rbbt/workflow/usage.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/definition.rb
Constant Summary collapse
- TAG =
:hash
Class Attribute Summary collapse
-
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
-
.workflows ⇒ Object
Returns the value of attribute workflows.
Instance Attribute Summary collapse
-
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
-
#description ⇒ Object
Returns the value of attribute description.
-
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
-
#helpers ⇒ Object
Returns the value of attribute helpers.
-
#last_task ⇒ Object
Returns the value of attribute last_task.
-
#libdir ⇒ Object
Returns the value of attribute libdir.
-
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
-
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
-
#task_description ⇒ Object
Returns the value of attribute task_description.
-
#tasks ⇒ Object
Returns the value of attribute tasks.
-
#workdir ⇒ Object
{{{ ATTR DEFAULTS.
Class Method Summary collapse
- .extended(base) ⇒ Object
- .load_workflow_file(filename) ⇒ Object
- .require_local_workflow(wf_name) ⇒ Object
- .require_remote_workflow(wf_name, url) ⇒ Object
- .require_workflow(wf_name) ⇒ Object
- .resolve_locals(inputs) ⇒ Object
Instance Method Summary collapse
- #dep(*dependency_list, &block) ⇒ Object
- #desc(description) ⇒ Object
- #doc(task = nil) ⇒ Object
- #export_asynchronous(*names) ⇒ Object
- #export_exec(*names) ⇒ Object
- #export_synchronous(*names) ⇒ Object
- #helper(name, &block) ⇒ Object
- #id_for(path) ⇒ Object
-
#job(taskname, jobname = nil, inputs = {}) ⇒ Object
{{{ JOB MANAGEMENT.
- #jobs(task, query = nil) ⇒ Object
- #load_id(id) ⇒ Object
- #load_step(path) ⇒ Object
- #local_persist_setup ⇒ Object
- #local_workdir_setup ⇒ Object
- #log(status, message = nil) ⇒ Object
- #make_local ⇒ Object
- #real_dependencies(task, jobname, inputs, dependencies) ⇒ Object
- #rec_dependencies(taskname) ⇒ Object
- #rec_input_defaults(taskname) ⇒ Object
- #rec_input_descriptions(taskname) ⇒ Object
- #rec_input_options(taskname) ⇒ Object
- #rec_input_types(taskname) ⇒ Object
- #rec_inputs(taskname) ⇒ Object
- #returns(description) ⇒ Object
- #step_path(taskname, jobname, inputs, dependencies) ⇒ Object
- #task(name, &block) ⇒ Object
- #task_for(path) ⇒ Object
- #task_info(name) ⇒ Object
- #workflow_description ⇒ Object
Methods included from AnnotatedModule
add_consummable_annotation, #input
Class Attribute Details
.autoinstall ⇒ Object
Returns the value of attribute autoinstall.
9 10 11 |
# File 'lib/rbbt/workflow.rb', line 9 def autoinstall @autoinstall end |
.workflows ⇒ Object
Returns the value of attribute workflows.
31 32 33 |
# File 'lib/rbbt/workflow.rb', line 31 def workflows @workflows end |
Instance Attribute Details
#asynchronous_exports ⇒ Object
Returns the value of attribute asynchronous_exports.
130 131 132 |
# File 'lib/rbbt/workflow.rb', line 130 def asynchronous_exports @asynchronous_exports end |
#description ⇒ Object
Returns the value of attribute description.
126 127 128 |
# File 'lib/rbbt/workflow.rb', line 126 def description @description end |
#exec_exports ⇒ Object
Returns the value of attribute exec_exports.
130 131 132 |
# File 'lib/rbbt/workflow.rb', line 130 def exec_exports @exec_exports end |
#helpers ⇒ Object
Returns the value of attribute helpers.
128 129 130 |
# File 'lib/rbbt/workflow.rb', line 128 def helpers @helpers end |
#last_task ⇒ Object
Returns the value of attribute last_task.
129 130 131 |
# File 'lib/rbbt/workflow.rb', line 129 def last_task @last_task end |
#libdir ⇒ Object
Returns the value of attribute libdir.
127 128 129 |
# File 'lib/rbbt/workflow.rb', line 127 def libdir @libdir end |
#synchronous_exports ⇒ Object
Returns the value of attribute synchronous_exports.
130 131 132 |
# File 'lib/rbbt/workflow.rb', line 130 def synchronous_exports @synchronous_exports end |
#task_dependencies ⇒ Object
Returns the value of attribute task_dependencies.
129 130 131 |
# File 'lib/rbbt/workflow.rb', line 129 def task_dependencies @task_dependencies end |
#task_description ⇒ Object
Returns the value of attribute task_description.
129 130 131 |
# File 'lib/rbbt/workflow.rb', line 129 def task_description @task_description end |
#tasks ⇒ Object
Returns the value of attribute tasks.
128 129 130 |
# File 'lib/rbbt/workflow.rb', line 128 def tasks @tasks end |
#workdir ⇒ Object
{{{ ATTR DEFAULTS
134 135 136 |
# File 'lib/rbbt/workflow.rb', line 134 def workdir @workdir end |
Class Method Details
.extended(base) ⇒ Object
35 36 37 38 |
# File 'lib/rbbt/workflow.rb', line 35 def self.extended(base) self.workflows << base base.libdir = Path.caller_lib_dir.tap{|p| p.resource = base} end |
.load_workflow_file(filename) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/rbbt/workflow.rb', line 45 def self.load_workflow_file(filename) begin $LOAD_PATH.unshift(File.join(File.dirname(File.(filename)), 'lib')) require filename Log.debug{"Workflow loaded from: #{ filename }"} return true rescue Exception Log.warn{"Error loading workflow: #{ filename }"} raise $! end end |
.require_local_workflow(wf_name) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/rbbt/workflow.rb', line 57 def self.require_local_workflow(wf_name) filename = nil if Path === wf_name case # Points to workflow file when ((File.exists?(wf_name.find) and not File.directory?(wf_name.find)) or File.exists?(wf_name.find + '.rb')) filename = wf_name.find # Points to workflow dir when (File.exists?(wf_name.find) and File.directory?(wf_name.find) and File.exists?(File.join(wf_name.find, 'workflow.rb'))) filename = wf_name['workflow.rb'].find end else case # Points to workflow file when ((File.exists?(wf_name) and not File.directory?(wf_name)) or File.exists?(wf_name + '.rb')) filename = (wf_name =~ /\.?\//) ? wf_name : "./" << wf_name when (defined?(Rbbt) and Rbbt.etc.workflow_dir.exists?) dir = Rbbt.etc.workflow_dir.read.strip dir = File.join(dir, wf_name) filename = File.join(dir, 'workflow.rb') when defined?(Rbbt) path = Rbbt.workflows[wf_name].find filename = File.join(path, 'workflow.rb') else path = File.join(ENV['HOME'], '.workflows', wf_name) filename = File.join(dir, 'workflow.rb') end end if filename and File.exists? filename load_workflow_file filename else return false end end |
.require_remote_workflow(wf_name, url) ⇒ Object
40 41 42 43 |
# File 'lib/rbbt/workflow.rb', line 40 def self.require_remote_workflow(wf_name, url) require 'rbbt/rest/client' eval "Object::#{wf_name} = WorkflowRESTClient.new '#{ url }', '#{wf_name}'" end |
.require_workflow(wf_name) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/rbbt/workflow.rb', line 96 def self.require_workflow(wf_name) # Already loaded begin Misc.string2const wf_name Log.debug{"Workflow #{ wf_name } already loaded"} return true rescue Exception end # Load remotely if Rbbt.etc.remote_workflows.exists? remote_workflows = Rbbt.etc.remote_workflows.yaml if Hash === remote_workflows and remote_workflows.include?(wf_name) url = remote_workflows[wf_name] require_remote_workflow(wf_name, url) Log.debug{"Workflow #{ wf_name } loaded remotely: #{ url }"} return end end # Load locally Log.info{"Loading workflow #{wf_name}"} require_local_workflow(wf_name) or require_local_workflow(Misc.snake_case(wf_name)) or (Workflow.autoinstall and `rbbt workflow install #{Misc.snake_case(wf_name)}` and require_local_workflow(Misc.snake_case(wf_name))) or raise("Workflow not found or could not be loaded: #{ wf_name }") end |
.resolve_locals(inputs) ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/rbbt/workflow.rb', line 16 def self.resolve_locals(inputs) inputs.each do |name, value| if value =~ /^local:(.*?):(.*)/ or (Array === value and value.length == 1 and value.first =~ /^local:(.*?):(.*)/) or (TSV === value and value.size == 1 and value.keys.first =~ /^local:(.*?):(.*)/) task_name = $1 jobname = $2 value = load_id(File.join(task_name, jobname)).load end inputs[name] = value end end |
Instance Method Details
#dep(*dependency_list, &block) ⇒ Object
24 25 26 27 |
# File 'lib/rbbt/workflow/definition.rb', line 24 def dep(*dependency_list, &block) dependency_list << block if block_given? dependencies.concat dependency_list end |
#desc(description) ⇒ Object
16 17 18 |
# File 'lib/rbbt/workflow/definition.rb', line 16 def desc(description) @description = description end |
#doc(task = nil) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/rbbt/workflow/usage.rb', line 25 def doc(task = nil) if task.nil? puts self.to_s puts "=" * self.to_s.length puts puts "\n" << workflow_description if workflow_description and not workflow_description.empty? puts puts "## TASKS" puts tasks.each do |name,task| puts " * #{ name }:" puts " " << task.description if task.description and not task.description.empty? puts end else if Task === task task_name = task.name else task_name = task task = self.tasks[task_name] end dependencies = self.rec_dependencies(task_name).collect{|dep_name| self.tasks[dep_name.to_sym]} task.doc(dependencies) end end |
#export_asynchronous(*names) ⇒ Object
64 65 66 |
# File 'lib/rbbt/workflow/definition.rb', line 64 def export_asynchronous(*names) asynchronous_exports.concat names end |
#export_exec(*names) ⇒ Object
60 61 62 |
# File 'lib/rbbt/workflow/definition.rb', line 60 def export_exec(*names) exec_exports.concat names end |
#export_synchronous(*names) ⇒ Object
68 69 70 |
# File 'lib/rbbt/workflow/definition.rb', line 68 def export_synchronous(*names) synchronous_exports.concat names end |
#helper(name, &block) ⇒ Object
12 13 14 |
# File 'lib/rbbt/workflow/definition.rb', line 12 def helper(name, &block) helpers[name] = block end |
#id_for(path) ⇒ Object
291 292 293 294 295 296 297 298 |
# File 'lib/rbbt/workflow/accessor.rb', line 291 def id_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end Misc.path_relative_to workdir_find, path end |
#job(taskname, jobname = nil, inputs = {}) ⇒ Object
{{{ JOB MANAGEMENT
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/rbbt/workflow.rb', line 189 def job(taskname, jobname = nil, inputs = {}) taskname = taskname.to_sym jobname = "Default" if jobname.nil? or jobname.empty? task = tasks[taskname] raise "Task not found: #{ taskname }" if task.nil? IndiferentHash.setup(inputs) Workflow.resolve_locals(inputs) dependencies = real_dependencies(task, jobname, inputs, task_dependencies[taskname] || []) input_values = task.take_input_values(inputs) step_path = step_path taskname, jobname, input_values, dependencies step = Step.new step_path, task, input_values, dependencies helpers.each do |name, block| (class << step; self; end).instance_eval do define_method name, &block end end step end |
#jobs(task, query = nil) ⇒ Object
234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/rbbt/workflow.rb', line 234 def jobs(task, query = nil) task_dir = File.join(workdir.find, task.to_s) if query.nil? path = File.join(task_dir, "**/*.info") else path = File.join(task_dir, query + "*.info") end Dir.glob(path).collect{|f| Misc.path_relative_to(task_dir, f).sub(".info",'') } end |
#load_id(id) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/rbbt/workflow.rb', line 221 def load_id(id) path = File.join(workdir, id) task = task_for path step = Step.new path, tasks[task.to_sym] step.info if step.info.include? :dependencies step.dependencies = step.info[:dependencies].collect do |task, job| load_id(File.join(task.to_s, job)) end end step end |
#load_step(path) ⇒ Object
216 217 218 219 |
# File 'lib/rbbt/workflow.rb', line 216 def load_step(path) task = task_for path Step.new path, tasks[task.to_sym] end |
#local_persist_setup ⇒ Object
247 248 249 250 251 252 |
# File 'lib/rbbt/workflow.rb', line 247 def local_persist_setup class << self include LocalPersist end self.local_persist_dir = Rbbt.var.cache.persistence.find :lib end |
#local_workdir_setup ⇒ Object
254 255 256 |
# File 'lib/rbbt/workflow.rb', line 254 def local_workdir_setup self.workdir = Rbbt.var.jobs.find :lib end |
#log(status, message = nil) ⇒ Object
181 182 183 184 185 186 187 |
# File 'lib/rbbt/workflow/accessor.rb', line 181 def log(status, = nil) if Log.low "#{ status }: #{ }" else Log.low "#{ status }" end end |
#make_local ⇒ Object
258 259 260 261 |
# File 'lib/rbbt/workflow.rb', line 258 def make_local local_persist_setup local_workdir_setup end |
#real_dependencies(task, jobname, inputs, dependencies) ⇒ Object
258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'lib/rbbt/workflow/accessor.rb', line 258 def real_dependencies(task, jobname, inputs, dependencies) real_dependencies = [] dependencies.each do |dependency| real_dependencies << case when Step === dependency dependency when Symbol === dependency job(dependency, jobname, inputs) when Proc === dependency dependency.call jobname, inputs end end real_dependencies.flatten.compact end |
#rec_dependencies(taskname) ⇒ Object
227 228 229 230 231 232 233 234 235 |
# File 'lib/rbbt/workflow/accessor.rb', line 227 def rec_dependencies(taskname) if task_dependencies.include? taskname deps = task_dependencies[taskname].select{|dep| String === dep or Symbol === dep} deps.concat deps.collect{|dep| rec_dependencies(dep)}.flatten deps.uniq else [] end end |
#rec_input_defaults(taskname) ⇒ Object
241 242 243 |
# File 'lib/rbbt/workflow/accessor.rb', line 241 def rec_input_defaults(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_defaults} end |
#rec_input_descriptions(taskname) ⇒ Object
249 250 251 |
# File 'lib/rbbt/workflow/accessor.rb', line 249 def rec_input_descriptions(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_descriptions} end |
#rec_input_options(taskname) ⇒ Object
253 254 255 |
# File 'lib/rbbt/workflow/accessor.rb', line 253 def (taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].} end |
#rec_input_types(taskname) ⇒ Object
245 246 247 |
# File 'lib/rbbt/workflow/accessor.rb', line 245 def rec_input_types(taskname) [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_types} end |
#rec_inputs(taskname) ⇒ Object
237 238 239 |
# File 'lib/rbbt/workflow/accessor.rb', line 237 def rec_inputs(taskname) [taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat tasks[tn.to_sym].inputs} end |
#returns(description) ⇒ Object
20 21 22 |
# File 'lib/rbbt/workflow/definition.rb', line 20 def returns(description) @result_description = description end |
#step_path(taskname, jobname, inputs, dependencies) ⇒ Object
274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 |
# File 'lib/rbbt/workflow/accessor.rb', line 274 def step_path(taskname, jobname, inputs, dependencies) raise "Jobname makes an invalid path: #{ jobname }" if jobname =~ /\.\./ if inputs.any? or dependencies.any? tagged_jobname = case TAG when :hash jobname + '_' + Misc.digest((inputs * "\n" + ";" + dependencies.collect{|dep| dep.name} * "\n")) else jobname end else tagged_jobname = jobname end workdir[taskname][tagged_jobname].find end |
#task(name, &block) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/rbbt/workflow/definition.rb', line 29 def task(name, &block) if Hash === name type = name.first.last name = name.first.first else result_type = consume_result_type || :marshal end name = name.to_sym block = self.method(name) unless block_given? task_info = { :name => name, :inputs => consume_inputs, :description => consume_description, :input_types => consume_input_types, :result_type => (Array === type ? type.to_sym : type), :input_defaults => consume_input_defaults, :input_descriptions => consume_input_descriptions, :input_options => } task = Task.setup(task_info, &block) last_task = task tasks[name] = task task_dependencies[name] = consume_dependencies end |
#task_for(path) ⇒ Object
300 301 302 303 304 305 306 307 308 |
# File 'lib/rbbt/workflow/accessor.rb', line 300 def task_for(path) if workdir.respond_to? :find workdir_find = workdir.find else workdir_find = workdir end Misc.path_relative_to(workdir_find, File.dirname(path)).sub(/([^\/]+)\/.*/,'\1') end |
#task_info(name) ⇒ Object
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/rbbt/workflow/accessor.rb', line 189 def task_info(name) name = name.to_sym task = tasks[name] description = task.description result_description = task.result_description result_type = task.result_type inputs = rec_inputs(name).uniq input_types = rec_input_types(name) input_descriptions = rec_input_descriptions(name) input_defaults = rec_input_defaults(name) = (name) export = case when (synchronous_exports.include?(name.to_sym) or synchronous_exports.include?(name.to_s)) :synchronous when (asynchronous_exports.include?(name.to_sym) or asynchronous_exports.include?(name.to_s)) :asynchronous when (exec_exports.include?(name.to_sym) or exec_exports.include?(name.to_s)) :exec else :none end dependencies = task_dependencies[name].select{|dep| String === dep or Symbol === dep} { :id => File.join(self.to_s, name.to_s), :description => description, :export => export, :inputs => inputs, :input_types => input_types, :input_descriptions => input_descriptions, :input_defaults => input_defaults, :input_options => , :result_type => result_type, :result_description => result_description, :dependencies => dependencies } end |
#workflow_description ⇒ Object
148 149 150 151 152 153 154 155 156 157 |
# File 'lib/rbbt/workflow.rb', line 148 def workflow_description @workflow_description ||= begin file = @libdir['workflow.md'] if file.exists? file.read else "" end end end |