Module: Workflow

Extended by:
Annotation
Defined in:
lib/scout/workflow.rb,
lib/scout/workflow/util.rb,
lib/scout/workflow/usage.rb,
lib/scout/workflow/definition.rb,
lib/scout/workflow/documentation.rb,
lib/scout/workflow/deployment/trace.rb,
lib/scout/workflow/deployment/orchestrator.rb

Defined Under Namespace

Classes: Orchestrator

Constant Summary collapse

FORGET_TASK_ALIAS =
begin 
  %w(SCOUT_FORGET_TASK_ALIAS SCOUT_FORGET_DEP_TASKS RBBT_FORGET_DEP_TASKS).select do |var|
    ENV[var] == 'true'
  end.any?
end
REMOVE_TASK_ALIAS =
begin 
  remove = %w(SCOUT_REMOVE_TASK_ALIAS SCOUT_REMOVE_DEP_TASKS RBBT_REMOVE_DEP_TASKS).select do |var|
    ENV.include?(var) && ENV[var] != 'false'
  end.first
  remove.nil? ? false : remove
end

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Annotation

list_tsv_values, load_info, load_tsv, load_tsv_values, obj_tsv_values, resolve_tsv_array, tsv

Class Attribute Details

.autoinstallObject

Returns the value of attribute autoinstall.



16
17
18
# File 'lib/scout/workflow.rb', line 16

def autoinstall
  @autoinstall
end

.directoryObject

Returns the value of attribute directory.



8
9
10
# File 'lib/scout/workflow/definition.rb', line 8

def directory
  @directory
end

.mainObject

Returns the value of attribute main.



16
17
18
# File 'lib/scout/workflow.rb', line 16

def main
  @main
end

.workflow_dirObject

Returns the value of attribute workflow_dir.



16
17
18
# File 'lib/scout/workflow.rb', line 16

def workflow_dir
  @workflow_dir
end

.workflow_repoObject

Returns the value of attribute workflow_repo.



16
17
18
# File 'lib/scout/workflow.rb', line 16

def workflow_repo
  @workflow_repo
end

.workflowsObject

Returns the value of attribute workflows.



16
17
18
# File 'lib/scout/workflow.rb', line 16

def workflows
  @workflows
end

Instance Attribute Details

#descriptionObject

Returns the value of attribute description.



2
3
4
# File 'lib/scout/workflow/documentation.rb', line 2

def description
  @description
end

#directoryObject

Returns the value of attribute directory.



48
49
50
# File 'lib/scout/workflow/definition.rb', line 48

def directory
  @directory
end

#documentationObject

Returns the value of attribute documentation.



49
50
51
# File 'lib/scout/workflow/documentation.rb', line 49

def documentation
  @documentation
end

#libdirObject

Returns the value of attribute libdir.



53
54
55
# File 'lib/scout/workflow.rb', line 53

def libdir
  @libdir
end

#titleObject

Returns the value of attribute title.



2
3
4
# File 'lib/scout/workflow/documentation.rb', line 2

def title
  @title
end

Class Method Details

.annonymous_workflow(name = nil, &block) ⇒ Object



2
3
4
5
6
7
8
9
# File 'lib/scout/workflow/util.rb', line 2

def self.annonymous_workflow(name = nil, &block)
  mod = Module.new
  mod.extend Workflow
  mod.name = name
  mod.directory = Workflow.directory[name] if name
  mod.instance_eval(&block)
  mod
end

.doc_parse_chunks(str, pattern) ⇒ Object



22
23
24
25
26
27
28
# File 'lib/scout/workflow/documentation.rb', line 22

def self.doc_parse_chunks(str, pattern)
  parts = str.split(pattern)
  return {} if parts.length < 2
  tasks = Hash[*parts[1..-1].collect{|v| v.strip}]
  tasks.delete_if{|t,d| d.empty?}
  tasks
end

.doc_parse_first_line(str) ⇒ Object



4
5
6
7
8
9
10
11
# File 'lib/scout/workflow/documentation.rb', line 4

def self.doc_parse_first_line(str)
  if str.match(/^([^\n]*)\n\n(.*)/sm)
    str.replace $2
    $1
  else
    ""
  end
end

.doc_parse_up_to(str, pattern, keep = false) ⇒ Object



13
14
15
16
17
18
19
20
# File 'lib/scout/workflow/documentation.rb', line 13

def self.doc_parse_up_to(str, pattern, keep = false)
  pre, _pat, _post = str.partition pattern
  if _pat
    [pre, (keep ? _pat << _post : _post)]
  else
    _post
  end
end

.extended(base) ⇒ Object



55
56
57
58
59
60
# File 'lib/scout/workflow.rb', line 55

def self.extended(base)
  self.workflows << base
  libdir = Path.caller_lib_dir
  return if libdir.nil?
  base.libdir = Path.setup(libdir).tap{|p| p.resource = base}
end

.get_SOPT(workflow, task) ⇒ Object



332
333
334
335
336
# File 'lib/scout/workflow/usage.rb', line 332

def self.get_SOPT(workflow, task)
  workflow = Workflow.require_workflow workflow if String === workflow
  task = workflow.tasks[task.to_sym] if String === task || Symbol === task
  workflow.get_SOPT(task)
end

.install_workflow(workflow, base_repo_url = nil) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/scout/workflow.rb', line 71

def self.install_workflow(workflow, base_repo_url = nil)
  case
  when File.exist?(workflow)
    update_workflow_dir(workflow)
  else
    Misc.in_dir(self.workflow_dir) do
      Log.info "Installing: " + workflow

      repo_base_url ||= self.workflow_repo


      if repo_base_url.include?(workflow) or repo_base_url.include?(Misc.snake_case(workflow))
        repo = repo_base_url
      else
        begin
          repo = File.join(repo_base_url, workflow + '.git')
          CMD.cmd("wget '#{repo}' -O /dev/null").read
        rescue
          Log.debug "Workflow repo does not exist, trying snake_case: #{ repo }"
          begin
            repo = File.join(repo_base_url, Misc.snake_case(workflow) + '.git')
            CMD.cmd("wget '#{repo}' -O /dev/null").read
          rescue
            raise "Workflow repo does not exist: #{ repo }"
          end
        end
      end

      Log.warn "Cloning #{ repo }"
      Misc.insist do
        `git clone "#{repo}" #{ Misc.snake_case(workflow) }`
        raise unless $?.success?
      end
      Log.warn "Initializing and updating submodules for #{repo}. You might be prompted for passwords."
      Misc.in_dir(Misc.snake_case(workflow)) do
        `git submodule init`
        `git submodule update`
      end
    end
  end
end

.installed_workflowsObject



11
12
13
# File 'lib/scout/workflow/util.rb', line 11

def self.installed_workflows
  Path.setup("workflows").glob_all("*").collect{|f| File.basename(f) }.uniq
end

.parse_workflow_doc(doc) ⇒ Object



30
31
32
33
34
35
36
# File 'lib/scout/workflow/documentation.rb', line 30

def self.parse_workflow_doc(doc)
  title = doc_parse_first_line doc
  description, task_info = doc_parse_up_to doc, /^# Tasks/i
  task_description, tasks = doc_parse_up_to task_info, /^##/, true
  tasks = doc_parse_chunks tasks, /^## (.*)/ 
  {:title => title.strip, :description => description.strip, :task_description => task_description.strip, :tasks => tasks}
end

.produce_dependencies(jobs, tasks, produce_cpus = Etc.nprocessors, produce_timer = 5) ⇒ Object



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/scout/workflow/deployment/orchestrator.rb', line 256

def self.produce_dependencies(jobs, tasks, produce_cpus = Etc.nprocessors, produce_timer = 5)
  jobs = [jobs] unless Array === jobs
  produce_list = []
  jobs.each do |job|
    next if job.done? || job.running?
    job.rec_dependencies.each do |job|
      produce_list << job if tasks.include?(job.task_name) ||
        tasks.include?(job.task_name.to_s) ||
        tasks.include?(job.full_task_name)
    end
  end

  orchestrator = Orchestrator.new produce_timer, cpus: produce_cpus.to_i
  orchestrator.process({}, produce_list)
  produce_list
end

.require_workflow(workflow_name_orig) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/scout/workflow.rb', line 119

def self.require_workflow(workflow_name_orig)
  first = nil
  workflow_name_orig.split("+").each do |complete_workflow_name|
    self.main = nil

    Persist.memory(complete_workflow_name, prefix: "Workflow") do
      begin
        workflow_name, *subworkflows = complete_workflow_name.split("::")
        workflow_file = workflow_name
        workflow_file = Path.setup('workflows')[workflow_name]["workflow.rb"] unless Open.exists?(workflow_file)
        workflow_file = Path.setup('workflows')[Misc.snake_case(workflow_name)]["workflow.rb"] unless Open.exists?(workflow_file)
        workflow_file = Path.setup('workflows')[Misc.camel_case(workflow_name)]["workflow.rb"] unless Open.exists?(workflow_file)
        if Open.exists?(workflow_file)
          self.main = nil
          require_workflow_file(workflow_file)
        elsif autoinstall
          install_workflow(workflow_name)
          raise TryAgain
        else
          raise "Workflow #{workflow_name} not found"
        end
      rescue TryAgain
        retry
      end
    end

    current = begin
                Kernel.const_get(complete_workflow_name)
              rescue
                self.main || workflows.last
              end

    first ||= current
  end
  first
end

.require_workflow_file(file) ⇒ Object



113
114
115
116
117
# File 'lib/scout/workflow.rb', line 113

def self.require_workflow_file(file)
  file = file.find if Path === file
  $LOAD_PATH.unshift(File.join(File.dirname(file), 'lib'))
  load file
end

.trace(seed_jobs, options = {}) ⇒ Object



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/scout/workflow/deployment/trace.rb', line 164

def self.trace(seed_jobs, options = {})
  jobs = []
  seed_jobs.each do |step|
    jobs += step.rec_dependencies + [step]
    step.info[:archived_info].each do |path,ainfo|
      next unless Hash === ainfo
      archived_step = Step.new path

      archived_step.define_singleton_method :info do
        ainfo
      end

      jobs << archived_step
    end if step.info[:archived_info]

  end

  jobs = jobs.uniq.sort_by{|job| [job, job.info]; t = job.info[:started] || Open.mtime(job.path) || Time.now; Time === t ? t : Time.parse(t) }

  report_keys = options[:report_keys] || ""
  report_keys = report_keys.split(/,\s*/) if String === report_keys

  data = trace_job_times(jobs, options[:fix_gap], report_keys)

  summary = trace_job_summary(jobs, report_keys)


  raise "No jobs to process" if data.size == 0

  size, width, height = options.values_at :size, :width, :height

  size = 800 if size.nil?
  width = size.to_i * 2 if width.nil?
  height = size  if height.nil?

  if options[:plot_data]
    data
  else
    summary
  end
end

.trace_job_summary(jobs, report_keys = []) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/scout/workflow/deployment/trace.rb', line 113

def self.trace_job_summary(jobs, report_keys = [])
  tasks_info = {}

  report_keys = report_keys.collect{|k| k.to_s}

  jobs.each do |dep|
    next unless dep.info[:end]
    task = [dep.workflow.name, dep.task_name].compact.collect{|s| s.to_s} * "#"
    info = tasks_info[task] ||= IndiferentHash.setup({})
    dep_info = IndiferentHash.setup(dep.info)

    ddone = dep_info[:end]
    started = dep_info[:start]

    started = Time.parse started if String === started
    ddone = Time.parse ddone if String === ddone

    time = ddone - started
    info[:time] ||= []
    info[:time] << time

    report_keys.each do |key|
      info[key] = dep_info[key] 
    end

    dep.info[:config_keys].each do |kinfo| 
      key, value, tokens = kinfo

      info[key.to_s] = value if report_keys.include? key.to_s
    end if dep.info[:config_keys]
  end

  summary = TSV.setup({}, "Task~Calls,Avg. Time,Total Time#:type=:list")

  tasks_info.each do |task, info|
    time_lists = info[:time]
    avg_time = Misc.mean(time_lists).to_i
    total_time = Misc.sum(time_lists).to_i
    calls = time_lists.length
    summary[task] = [calls, avg_time, total_time]
  end

  report_keys.each do |key|
    summary.add_field Misc.humanize(key) do |task|
      tasks_info[task][key]
    end
  end if Array === report_keys && report_keys.any?

  summary
end

.trace_job_times(jobs, fix_gap = false, report_keys = nil) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/scout/workflow/deployment/trace.rb', line 4

def self.trace_job_times(jobs, fix_gap = false, report_keys = nil)
  data = TSV.setup({}, "Job~Code,Workflow,Task,Start,End#:type=:list")
  min_start = nil
  max_done = nil
  jobs.each do |job|
    next unless job.info[:end]
    started = job.info[:start]
    ddone = job.info[:end]

    started = Time.parse started if String === started
    ddone = Time.parse ddone if String === ddone

    code = [job.workflow.name, job.task_name].compact.collect{|s| s.to_s} * " · "
    code = job.name + " - " + code

    data[job.path] = [code, job.workflow.name, job.task_name, started, ddone]
    if min_start.nil?
      min_start = started
    else
      min_start = started if started < min_start
    end

    if max_done.nil?
      max_done = ddone
    else
      max_done = ddone if ddone > max_done
    end
  end

  data.add_field "Start.second" do |k,value|
    value["Start"] - min_start
  end

  data.add_field "End.second" do |k,value|
    value["End"] - min_start
  end

  if fix_gap
    ranges = []
    data.through do |k,values|
      start, eend = values.values_at "Start.second", "End.second"

      ranges << (start..eend)
    end

    gaps = {}
    last = nil
    Misc.collapse_ranges(ranges).each do |range|
      start = range.begin
      eend = range.end
      if last
        gaps[last] = start - last
      end
      last = eend
    end

    data.process "End.second" do |value,k,values|
      gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size})
      value - gap
    end

    data.process "Start.second" do |value,k,values|
      gap = Misc.sum(gaps.select{|pos,size| pos < values["Start.second"]}.collect{|pos,size| size})
      value - gap
    end

    total_gaps = Misc.sum(gaps.collect{|k,v| v})
    Log.info "Total gaps: #{total_gaps} seconds"
  end

  if report_keys && report_keys.any?
    job_keys = {}
    jobs.each do |job|
      job_info = IndiferentHash.setup(job.info)
      report_keys.each do |key|
        job_keys[job.path] ||= {}
        job_keys[job.path][key] = job_info[key]
      end
    end
    report_keys.each do |key|
      data.add_field Misc.humanize(key) do |p,values|
        job_keys[p][key]
      end
    end
  end

  start = data.column("Start.second").values.flatten.collect{|v| v.to_f}.min
  eend = data.column("End.second").values.flatten.collect{|v| v.to_f}.max
  total = eend - start unless eend.nil? || start.nil?
  Log.info "Total time elapsed: #{total} seconds" if total

  if report_keys && report_keys.any?
    job_keys = {}
    report_keys.each do |key|
      jobs.each do |job|
        job_keys[job.path] ||= {}
        job_keys[job.path][key] = job.info[key]
      end
    end
    report_keys.each do |key|
      data.add_field Misc.humanize(key) do |p,values|
        job_keys[p][key]
      end
    end
  end

  data
end

.update_workflow_dir(workflow_dir) ⇒ Object



62
63
64
65
66
67
68
69
# File 'lib/scout/workflow.rb', line 62

def self.update_workflow_dir(workflow_dir)
  Misc.in_dir(workflow_dir) do
    Log.info "Updating: " + workflow_dir
    `git pull`
    `git submodule init`
    `git submodule update`
  end
end

Instance Method Details

#_prov_tasks(tree) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/scout/workflow/usage.rb', line 163

def _prov_tasks(tree)
  tasks = [] 
  heap = [tree]
  while heap.any?
    t = heap.pop
    t.each do |k,v|
      tasks << k
      heap << v
    end
  end
  tasks
end

#annotate_next_task(type, obj) ⇒ Object



58
59
60
61
62
# File 'lib/scout/workflow/definition.rb', line 58

def annotate_next_task(type, obj)
  @annotate_next_task ||= {}
  @annotate_next_task[type] ||= []
  @annotate_next_task[type] << obj
end

#annotate_next_task_single(type, obj) ⇒ Object



64
65
66
67
# File 'lib/scout/workflow/definition.rb', line 64

def annotate_next_task_single(type, obj)
  @annotate_next_task ||= {}
  @annotate_next_task[type] = obj
end

#dep(*args, &block) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/scout/workflow/definition.rb', line 69

def dep(*args, &block)
  case args.length
  when 3
    workflow, task, options = args
  when 2
    if Hash === args.last
      task, options = args
    else
      workflow, task = args
    end
  when 1
    task = args.first
    options, task = task, nil if Hash === task
  end
  workflow = self if workflow.nil?
  options = {} if options.nil?
  task = task.to_sym if task
  annotate_next_task :deps, [workflow, task, options, block, args]
end

#dep_tree(task_name, seen = nil, seen_options = nil) ⇒ Object

Raises:



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/scout/workflow/usage.rb', line 130

def dep_tree(task_name, seen = nil, seen_options = nil)
  @dep_tree ||= {}
  key = [self, task_name]

  return @dep_tree[key] if @dep_tree.include?(key)
  save = seen.nil?
  seen = Set.new if seen.nil?
  seen_options = {} if seen_options.nil?

  dep_tree = {}
  task = self.tasks[task_name]
  raise TaskNotFound, "Task #{task_name} in #{self.to_s}" if task.nil?
  task.deps.each do |workflow, task, options|
    next if seen.include? dep
    seen << [workflow, task, options.merge(seen_options)]
    next if task.nil?

    key = [workflow, task]

    dep_tree[key] = workflow.dep_tree(task, seen, options.merge(seen_options))
  end if task.deps

  @dep_tree[key] = dep_tree if save

  dep_tree
end

#desc(description) ⇒ Object



95
96
97
# File 'lib/scout/workflow/definition.rb', line 95

def desc(description)
  annotate_next_task_single(:description, description)
end

#documentation_markdownObject



38
39
40
41
42
43
44
45
46
47
# File 'lib/scout/workflow/documentation.rb', line 38

def documentation_markdown
  return "" if @libdir.nil?
  file = @libdir['workflow.md'].find
  file = @libdir['README.md'].find unless file.exists?
  if file.exists?
    file.read
  else
    ""
  end
end

#export(*args) ⇒ Object Also known as: export_synchronous, export_asynchronous, export_exec, export_stream



219
220
# File 'lib/scout/workflow/definition.rb', line 219

def export(*args)
end

#extension(extension) ⇒ Object



103
104
105
# File 'lib/scout/workflow/definition.rb', line 103

def extension(extension)
  annotate_next_task_single(:extension, extension)
end

#find_in_dependencies(name, dependencies) ⇒ Object



15
16
17
18
# File 'lib/scout/workflow/util.rb', line 15

def find_in_dependencies(name, dependencies)
  name = name.to_sym
  dependencies.select{|dep| dep.task_name.to_sym == name }
end

#get_SOPT(task) ⇒ Object



327
328
329
330
# File 'lib/scout/workflow/usage.rb', line 327

def get_SOPT(task)
  sopt_option_string = self.SOPT_str(task)
  SOPT.get sopt_option_string
end

#helper(name, *args, **kwargs, &block) ⇒ Object



24
25
26
27
28
29
30
31
32
33
# File 'lib/scout/workflow/definition.rb', line 24

def helper(name, *args, **kwargs, &block)
  if block_given?
    helpers[name] = block
  else
    raise RbbtException, "helper #{name} unkown in #{self} workflow" unless helpers[name]
    o = Object.new
    o.extend step_module
    o.send(name, *args, **kwargs)
  end
end

#helpersObject



20
21
22
# File 'lib/scout/workflow/definition.rb', line 20

def helpers
  @helpers ||= {}
end

#input(name, type = nil, *rest) ⇒ Object



89
90
91
92
93
# File 'lib/scout/workflow/definition.rb', line 89

def input(name, type = nil, *rest)
  name = name.to_sym
  type = type.to_sym if type
  annotate_next_task(:inputs, [name, type] + rest)
end

#job(name, *args) ⇒ Object

Raises:



156
157
158
159
160
161
162
# File 'lib/scout/workflow.rb', line 156

def job(name, *args)
  task = tasks[name]
  raise TaskNotFound, "Task #{task_name} in #{self.to_s}" if task.nil?
  step = task.job(*args)
  step.extend step_module
  step
end

#nameObject



16
17
18
# File 'lib/scout/workflow/definition.rb', line 16

def name
  @name ||= self.to_s
end

#prov_string(tree) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/scout/workflow/usage.rb', line 176

def prov_string(tree)
  description = ""

  last = nil
  seen = Set.new

  tasks = _prov_tasks(tree)
  tasks.each do |workflow,task_name|

    next if seen.include?([workflow,task_name])

    child = last && last.include?([workflow, task_name])
    first = last.nil?
    last = _prov_tasks(workflow.dep_tree(task_name))

    break if child

    if child
      description << "->" << task_name.to_s
    elsif first
      description << "" << task_name.to_s
    else
      description << ";" << task_name.to_s
    end
    
    seen << [workflow, task_name]
  end
  description
end

#prov_tree(tree, offset = 0, seen = []) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/scout/workflow/usage.rb', line 206

def prov_tree(tree, offset = 0, seen = [])

  return "" if tree.empty?

  lines = []

  offset_str = " " * offset

  lines << offset_str 

  tree.each do |p,dtree| 
    next if seen.include?(p)
    seen.push(p)
    workflow, task = p
    lines << offset_str + [workflow.to_s, task.to_s] * "#" + "\n" + workflow.prov_tree(dtree, offset + 1, seen)
  end

  lines * "\n"
end

#recursive_deps(task_name) ⇒ Object



157
158
159
160
161
# File 'lib/scout/workflow/usage.rb', line 157

def recursive_deps(task_name)
  dependencies = []
  dep_tree(task_name, dependencies)
  dependencies
end

#returns(type) ⇒ Object



99
100
101
# File 'lib/scout/workflow/definition.rb', line 99

def returns(type)
  annotate_next_task_single(:returns, type)
end

#SOPT_str(task) ⇒ Object



315
316
317
318
319
320
321
322
323
324
325
# File 'lib/scout/workflow/usage.rb', line 315

def SOPT_str(task)
  sopt_options = []
  self.tasks[task].recursive_inputs.each do |name,type,desc,default,options|
    shortcut = options[:shortcut] || name.to_s.slice(0,1)
    boolean = type == :boolean

    sopt_options << "-#{short}--#{name}#{boolean ? "" : "*"}"
  end

  sopt_options * ":"
end

#step_moduleObject



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/scout/workflow/definition.rb', line 35

def step_module
  @_m ||= begin
            m = Module.new

            helpers.each do |name,block|
              m.send(:define_method, name, &block)
            end

            m
          end
  @_m
end

#task(name_and_type, &block) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/scout/workflow/definition.rb', line 107

def task(name_and_type, &block)
  case name_and_type
  when Hash
    name, type = name_and_type.collect.first
  when Symbol
    name, type = [name_and_type, :binary]
  when String
    name, type = [name_and_type, :binary]
  end
  type = type.to_sym if String === type
  name = name.to_sym if String === name
  @tasks ||= IndiferentHash.setup({})
  block = lambda &self.method(name) if block.nil?
  begin
    @annotate_next_task ||= {}
    @annotate_next_task[:extension] ||=  
      case type
      when :tsv
        "tsv"
      when :yaml
        "yaml"
      when :marshal
        "marshal"
      when :json
        "json"
      else
        nil
      end

    task = Task.setup(block, @annotate_next_task.merge(name: name, type: type, directory: directory[name], workflow: self))
    @tasks[name] = task
  ensure
    @annotate_next_task = {}
  end
end

#task_alias(name, workflow, oname, *rest, &block) ⇒ Object Also known as: dep_task



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/scout/workflow/definition.rb', line 154

def task_alias(name, workflow, oname, *rest, &block)
  dep(workflow, oname, *rest, &block) 
  extension :dep_task unless @extension
  task_proc = workflow.tasks[oname] if workflow.tasks
  if task_proc
    returns task_proc.returns if @returns.nil?
    type = task_proc.type 
  end
  task name => type do
    raise RbbtException, "dep_task does not have any dependencies" if dependencies.empty?
    Step.wait_for_jobs dependencies.select{|d| d.streaming? }
    dep = dependencies.last
    dep.join
    raise dep.get_exception if dep.error?
    raise Aborted, "Aborted dependency #{dep.path}" if dep.aborted?
    set_info :type, dep.info[:type]

    forget = config :forget_task_alias, "forget_task_alias"
    forget = config :forget_dep_tasks, "forget_dep_tasks", :default => FORGET_TASK_ALIAS if forget.nil?

    if forget
      remove = config :remove_task_alias, "remove_task_alias"
      remove = config :remove_dep_tasks, "remove_dep_tasks", :default => REMOVE_TASK_ALIAS if remove.nil?

      Log.medium "Forget task_alias (remove: #{remove}): #{short_path}"

      self.archive_deps
      self.copy_linked_files_dir
      self.dependencies = self.dependencies - [dep]
      Open.rm_rf self.files_dir if Open.exist? self.files_dir
      FileUtils.cp_r dep.files_dir, self.files_dir if Open.exist?(dep.files_dir)

      if dep.overriden? 
        Open.link dep.path, self.tmp_path
      else
        Open.ln_h dep.path, self.tmp_path

        case remove.to_s
        when 'true'
          dep.clean
        when 'recursive'
          (dep.dependencies + dep.rec_dependencies).uniq.each do |d|
            next if d.overriden
            d.clean unless Scout::Config.get(:remove_dep, "task:#{d.task_signature}", "task:#{d.task_name}", "workflow:#{d.workflow.name}", :default => true).to_s == 'false'
          end
          dep.clean unless Scout::Config.get(:remove_dep, "task:#{dep.task_signature}", "task:#{dep.task_name}", "workflow:#{dep.workflow.name}", :default => true).to_s == 'false'
        end 
      end
    else
      if Open.exists?(dep.files_dir)
        Open.rm_rf self.files_dir 
        Open.link dep.files_dir, self.files_dir
      end
      if defined?(RemoteStep) && RemoteStep === dep
        Open.write(self.tmp_path, Open.read(dep.path))
      else
        Open.link dep.path, self.path
      end
    end
    nil
  end
end

#usage(task = nil, abridge = false) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/scout/workflow/usage.rb', line 226

def usage(task = nil, abridge = false)

  str = StringIO.new

  if self.documentation[:title] and not self.documentation[:title].empty?
    title = self.name + " - " + self.documentation[:title]
    str.puts Log.color :magenta, title
    str.puts Log.color :magenta, "=" * title.length
  else
    str.puts Log.color :magenta, self.name 
    str.puts Log.color :magenta, "=" * self.name.length
  end

  str.puts

  if tasks.nil?
    str.puts Log.color(:title, "No tasks")
  elsif task.nil?

    if self.documentation[:description] and not self.documentation[:description].empty?
      str.puts Misc.format_paragraph self.documentation[:description] 
      str.puts
    end

    str.puts Log.color :magenta, "## TASKS"
    if self.documentation[:task_description] and not self.documentation[:task_description].empty?
      str.puts
      str.puts Misc.format_paragraph self.documentation[:task_description] 
    end
    str.puts

    final = Set.new
    not_final = Set.new
    tasks.each do |name,task|
      tree = dep_tree(name)
      not_final += tree.keys
      final << name unless not_final.include?(name)
    end

    not_final.each do |p|
      final -= [p.last]
    end

    tasks.each do |name,task|
      description = task.description || ""
      description = description.split("\n\n").first

      next if abridge && ! final.include?(name)
      str.puts Misc.format_definition_list_item(name.to_s, description, nil, nil, :yellow)

      prov_string = prov_string(dep_tree(name))
      str.puts Misc.format_paragraph Log.color(:blue, "->" + prov_string) if prov_string && ! prov_string.empty?
    end 

  else

    if Task === task
      task_name = task.name
    else
      task_name = task
      task = self.tasks[task_name]
    end

    str.puts task.usage(self, self.recursive_deps(task_name))

    dep_tree = {[self, task_name] => dep_tree(task_name)}
    prov_tree = prov_tree(dep_tree)
    if prov_tree && ! prov_tree.empty? && prov_tree.split("\n").length > 2

      str.puts
      str.puts Log.color :magenta, "## DEPENDENCY GRAPH (abridged)"
      str.puts
      prov_tree.split("\n").each do |line|
        next if line.strip.empty?
        if m = line.match(/^( *)(\w+?)#(\w*)/i)
            offset, workflow, task_name =  m.values_at 1, 2, 3
            str.puts [offset, Log.color(:magenta, workflow), "#", Log.color(:yellow, task_name)] * ""
        else
          str.puts Log.color :blue, line 
        end
      end
      str.puts
    end
  end

  str.rewind
  str.read
end