Module: Task

Extended by:
Annotation
Defined in:
lib/scout/workflow/task.rb,
lib/scout/workflow/usage.rb,
lib/scout/workflow/task/inputs.rb,
lib/scout/workflow/task/dependencies.rb

Constant Summary collapse

DEFAULT_NAME =
"Default"

Class Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Annotation

list_tsv_values, load_info, load_tsv, load_tsv_values, obj_tsv_values, resolve_tsv_array, tsv

Class Attribute Details

.default_directoryObject

Returns the value of attribute default_directory.



14
15
16
# File 'lib/scout/workflow/task.rb', line 14

def default_directory
  @default_directory
end

Class Method Details

.format_input(value, type, options = {}) ⇒ Object



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/scout/workflow/task/inputs.rb', line 3

def self.format_input(value, type, options = {})
  return value if IO === value || StringIO === value || Step === value

  if String === value && ! [:path, :file, :folder, :binary, :tsv].include?(type) && ! (options &&  (options[:noload] || options[:stream] || options[:nofile]))
    if Open.exists?(value) && ! Open.directory?(value)
      Persist.load(value, type) 
    else
      Persist.deserialize(value, type)
    end
  else
    if m = type.to_s.match(/(.*)_array/)
      if Array === value
        value.collect{|v| self.format_input(v, m[1].to_sym, options) }
      end
    else
      value
    end
  end
end

.load_input_from_file(filename, type, options = nil) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/scout/workflow/task/inputs.rb', line 130

def self.load_input_from_file(filename, type, options = nil)
  if Open.exists?(filename) || filename = Dir.glob(File.join(filename + ".*")).first
    if filename.end_with?('.as_file')
      value = Open.read(filename).strip
      value.sub!(/^\./, File.dirname(filename)) if value.start_with?("./")
      value
    elsif filename.end_with?('.as_step')
      value = Open.read(filename).strip
      Step.load value
    elsif filename.end_with?('.as_path')
      value = Open.read(filename).strip
      Path.setup value
    elsif (options &&  (options[:noload] || options[:stream] || options[:nofile]))
      filename
    else
      Persist.load(filename, type)
    end
  else
    return nil
  end
end

.save_file_input(orig_file, directory) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/scout/workflow/task/inputs.rb', line 73

def self.save_file_input(orig_file, directory)
  orig_file = orig_file.path if Step === orig_file
  basename = File.basename(orig_file)
  digest = Misc.digest(orig_file)
  if basename.include? '.'
    basename.sub!(/(.*)\.(.*)/, '\1-' + digest + '.\2')
  else
    basename += "-#{digest}"
  end
  new_file = File.join(directory, 'saved_input_files', basename)
  relative_file = File.join('.', 'saved_input_files', basename) 
  Open.link orig_file, new_file
  relative_file
end

.save_input(directory, name, type, value) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/scout/workflow/task/inputs.rb', line 88

def self.save_input(directory, name, type, value)
  input_file = File.join(directory, name.to_s)

  if Path.is_filename?(value) 
    if type == :path
      Open.write(input_file + ".as_path", value)
    elsif Path.step_file?(value)
      Open.write(input_file + ".as_path", value)
    else
      relative_file = save_file_input(value, directory)
      Open.write(input_file + ".as_file", relative_file)
    end
  elsif Step === value
    Open.write(input_file + ".as_step", value.short_path)
  elsif type == :file
    relative_file = save_file_input(value, directory)
    Persist.save(relative_file, input_file, :file)
  elsif type == :file_array
    new_files = value.collect do |orig_file|
      save_file_input(orig_file, directory)
    end
    Persist.save(new_files, input_file, type)
  elsif Open.is_stream?(value)
    Open.sensible_write(input_file, value)
  elsif Open.has_stream?(value)
    Open.sensible_write(input_file, value.stream)
  else
    Persist.save(value, input_file, type)
  end
end

Instance Method Details

#alias?Boolean

Returns:

  • (Boolean)


110
111
112
# File 'lib/scout/workflow/task.rb', line 110

def alias?
  @extension == :dep_task
end

#assign_inputs(provided_inputs = {}, id = nil) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/scout/workflow/task/inputs.rb', line 23

def assign_inputs(provided_inputs = {}, id = nil)
  if self.inputs.nil? || (self.inputs.empty? && Array === provided_inputs)
    case provided_inputs
    when Array
      return [provided_inputs, provided_inputs]
    else
      return [[], []]
    end
  end

  IndiferentHash.setup(provided_inputs) if Hash === provided_inputs

  input_array = []
  input_names = []
  non_default_inputs = []
  self.inputs.each_with_index do |p,i|
    name, type, desc, value, options = p
    input_names << name
    provided = Hash === provided_inputs ? provided_inputs[name] : provided_inputs[i]
    provided = Task.format_input(provided, type, options || {})
    if provided == value
      same_as_default = true
    elsif String === provided && Symbol === value && provided == value.to_s
      same_as_default = true
    elsif String === value && Symbol === provided && provided.to_s == value
      same_as_default = true
    else
      same_as_default = false
    end
    if ! provided.nil? && ! same_as_default
      non_default_inputs << name.to_sym
      input_array << provided
    elsif options && options[:jobname]
      input_array << id
    else
      input_array << value
    end
  end

  NamedArray.setup(input_array, input_names)

  [input_array, non_default_inputs]
end

#dependencies(id, provided_inputs, non_default_inputs = [], compute = {}) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/scout/workflow/task/dependencies.rb', line 2

def dependencies(id, provided_inputs, non_default_inputs = [], compute = {})
  return [] if deps.nil?
  dependencies = []
  
  provided_inputs ||= {}

  # Helper function
  load_dep = proc do |id, workflow, task, step_options, definition_options, dependencies|
    task = step_options.delete(:task) if step_options.include?(:task)
    workflow = step_options.delete(:workflow) if step_options.include?(:workflow)
    id = step_options.delete(:id) if step_options.include?(:id)
    id = step_options.delete(:jobname) if step_options.include?(:jobname)

    step_inputs = step_options.include?(:inputs)? step_options.delete(:inputs) : step_options
    step_inputs = IndiferentHash.add_defaults step_inputs, definition_options

    resolved_inputs = {}
    step_inputs.each do |k,v|
      if Symbol === v
        input_dep = dependencies.select{|d| d.task_name == v }.first
        resolved_inputs[k] = input_dep || provided_inputs[v] || step_inputs[v] || v
      else
        resolved_inputs[k] = v
      end
    end
    job = workflow.job(task, id, resolved_inputs)
    compute_options = definition_options[:compute] || []
    compute_options = [compute_options] unless Array === compute_options
    compute_options << :canfail if definition_options[:canfail]
    compute_options << :produce if definition_options[:produce]
    compute_options << :stream if definition_options[:stream]
    compute[job.path] = compute_options if compute_options.any?

    job.overriden = false if definition_options[:not_overriden]

    [job, step_inputs]
  end

  # Helper function
  find_dep_non_default_inputs = proc do |dep,definition_options,step_inputs={}|
    dep_non_default_inputs = dep.non_default_inputs
    dep_non_default_inputs.select do |name|
      step_inputs.include?(name)  
    end
    dep_non_default_inputs.reject! do |name|
      definition_options.include?(name) && 
        (definition_options[name] != :placeholder || definition_options[name] != dep.inputs[name])
    end

    dep_non_default_inputs
  end

  deps.each do |workflow,task,definition_options,block=nil|
    definition_options[:id] = definition_options.delete(:jobname) if definition_options.include?(:jobname)

    if provided_inputs.include?(overriden = [workflow.name, task] * "#")
      dep = provided_inputs[overriden]
      dep = Step.new dep unless Step === dep
      dep = dep.dup
      dep.type = workflow.tasks[task].type
      dep.overriden_task = task
      dep.overriden_workflow = workflow
      dependencies << dep
      non_default_inputs << overriden
      next
    end

    definition_options ||= {}

    if block
      fixed_provided_inputs = self.assign_inputs(provided_inputs).first.to_hash
      self.inputs.each do |name,type,desc,value|
        fixed_provided_inputs[name] = value unless fixed_provided_inputs.include?(name)
      end
      fixed_provided_inputs = IndiferentHash.add_defaults fixed_provided_inputs, provided_inputs
      block_options = IndiferentHash.add_defaults definition_options.dup, fixed_provided_inputs

      res = block.call id, block_options, dependencies

      case res
      when Step
        dep = res
        dependencies << dep
        dep_non_default_inputs = find_dep_non_default_inputs.call(dep, definition_options)
        non_default_inputs.concat(dep_non_default_inputs)
      when Hash
        step_options = block_options.merge(res)
        dep, step_inputs = load_dep.call(id, workflow, task, step_options, block_options, dependencies)
        dependencies << dep
        dep_non_default_inputs = find_dep_non_default_inputs.call(dep, definition_options, step_inputs)
        non_default_inputs.concat(dep_non_default_inputs)
      when Array
        res.each do |_res|
          if Hash === _res
            step_options = block_options.merge(_res)
            dep, step_inputs = load_dep.call(id, workflow, task, step_options, block_options, dependencies)
            dependencies << dep
            dep_non_default_inputs = find_dep_non_default_inputs.call(dep, definition_options, step_inputs)
            non_default_inputs.concat(dep_non_default_inputs)
          else
            dep = _res
            dependencies << dep
            dep_non_default_inputs = find_dep_non_default_inputs.call(dep, definition_options)
            non_default_inputs.concat(dep_non_default_inputs)
          end
        end
      end
    else
      step_options = IndiferentHash.add_defaults definition_options.dup, provided_inputs
      dep, step_inputs = load_dep.call(id, workflow, task, step_options, definition_options, dependencies)
      dependencies << dep
      dep_non_default_inputs = find_dep_non_default_inputs.call(dep, definition_options, step_inputs)
      non_default_inputs.concat(dep_non_default_inputs)
    end
  end

  dependencies
end

#directoryObject



25
26
27
# File 'lib/scout/workflow/task.rb', line 25

def directory
  @directory ||= Task.default_directory
end

#exec_on(binding = self, *inputs) ⇒ Object



29
30
31
# File 'lib/scout/workflow/task.rb', line 29

def exec_on(binding = self, *inputs)
  binding.instance_exec(*inputs, &self)
end

#get_SOPTObject



114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/scout/workflow/usage.rb', line 114

def get_SOPT
  sopt_option_string = self.SOPT_str
  job_options = SOPT.get sopt_option_string

  recursive_inputs.uniq.each do |name,type|
    next unless type.to_s.include?('array')
    if job_options.include?(name) && (! Open.exist?(job_options[name]) || type.to_s.include?('file') || type.to_s.include?('path'))
      job_options[name] = job_options[name].split(",")
    end
  end
  job_options
end

#inputsObject



21
22
23
# File 'lib/scout/workflow/task.rb', line 21

def inputs
  @inputs ||= []
end

#job(id = nil, provided_inputs = nil) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/scout/workflow/task.rb', line 33

def job(id = nil, provided_inputs = nil)
  Persist.memory("Task job", other_options: {task: self, id: id, provided_inputs: provided_inputs}) do 
    provided_inputs, id = id, nil if (provided_inputs.nil? || provided_inputs.empty?) && (Hash === id || Array === id)
    provided_inputs = {} if provided_inputs.nil?
    IndiferentHash.setup(provided_inputs)

    if id.nil?
      inputs.each do |name,type,desc,default,input_options|
        next unless input_options && input_options[:jobname]
        id = provided_inputs[name] || default
      end
      id = DEFAULT_NAME if id.nil?
    end

    missing_inputs = []
    self.inputs.each do |input,type,desc,val,options|
      next unless options && options[:required]
      missing_inputs << input unless provided_inputs.include?(input)
    end if self.inputs

    if missing_inputs.length == 1
      raise ParameterException, "Input '#{missing_inputs.first}' is required but was not provided or is nil"
    end

    if missing_inputs.length > 1
      raise ParameterException, "Inputs #{Misc.humanize_list(missing_inputs)} are required but were not provided or are nil"
    end

    provided_inputs = load_inputs(provided_inputs.delete(:load_inputs)).merge(provided_inputs) if Hash === provided_inputs && provided_inputs[:load_inputs]

    job_inputs, non_default_inputs, input_digest_str = process_inputs provided_inputs, id

    compute = {}
    dependencies = dependencies(id, provided_inputs, non_default_inputs, compute)

    #non_default_inputs.concat provided_inputs.keys.select{|k| String === k && k.include?("#") } if Hash === provided_inputs

    non_default_inputs.uniq!

    if non_default_inputs.any?
      hash = Misc.digest(:inputs => input_digest_str, :dependencies => dependencies)
      name = [id, hash] * "_"
    else
      name = id
    end

    extension = self.extension
    if extension == :dep_task
      extension = nil
      if dependencies.any?
        dep_basename = File.basename(dependencies.last.path)
        if dep_basename.include? "."
          parts = dep_basename.split(".")
          extension = [parts.pop]
          while parts.last.length <= 4
            extension << parts.pop
          end
          extension = extension.reverse * "."
        end
      end
    end

    path = directory[name]

    path = path.set_extension(extension) if extension

    if hash
      Log.debug "ID #{self.name} #{id} - #{hash}: #{Log.fingerprint(:input_digest => input_digest_str, :non_default_inputs => non_default_inputs, :dependencies => dependencies)}"
    else
      Log.debug "ID #{self.name} #{id} - Clean"
    end
    NamedArray.setup(job_inputs, @inputs.collect{|i| i[0] }) if @inputs
    step_provided_inputs = Hash === provided_inputs ? provided_inputs.slice(*non_default_inputs) : provided_inputs
    Step.new path.find, job_inputs, dependencies, id, non_default_inputs, step_provided_inputs, compute, &self
  end
end

#load_inputs(directory) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/scout/workflow/task/inputs.rb', line 152

def load_inputs(directory)
  inputs = IndiferentHash.setup({})
  self.recursive_inputs.each do |p|
    name, type, desc, value, options = p
    filename = File.join(directory, name.to_s) 
    value = Task.load_input_from_file(filename, type, options)
    inputs[name] = value unless value.nil?
  end

  Dir.glob(File.join(directory, "*#*")).each do |file|
    override_dep, _, extension = File.basename(file).partition(".")

    inputs[override_dep] = Task.load_input_from_file(file, :file)
  end

  inputs
end

#process_inputs(provided_inputs = {}, id = nil) ⇒ Object



67
68
69
70
71
# File 'lib/scout/workflow/task/inputs.rb', line 67

def process_inputs(provided_inputs = {}, id = nil)
  input_array, non_default_inputs = assign_inputs provided_inputs, id
  digest_str = Misc.digest_str(input_array)
  [input_array, non_default_inputs, digest_str]
end

#recursive_inputs(overriden = []) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/scout/workflow/task/inputs.rb', line 170

def recursive_inputs(overriden = [])
  return inputs.dup if deps.nil?
  deps.inject(inputs.dup) do |acc,dep|
    workflow, task, options = dep
    next acc if workflow.nil? || task.nil?
    next acc if overriden.include?([workflow.name, task.to_s] * "#")
    overriden.concat options.keys.select{|k| k.to_s.include?("#") } if options

    workflow.tasks[task].recursive_inputs(overriden).dup.each do |info|
      name, _ = info
      next if options.include?(name.to_sym) || options.include?(name.to_s)
      acc << info
    end

    acc
  end
end

#save_inputs(directory, provided_inputs = {}) ⇒ Object



119
120
121
122
123
124
125
126
127
# File 'lib/scout/workflow/task/inputs.rb', line 119

def save_inputs(directory, provided_inputs = {})
  self.recursive_inputs.each_with_index do |p,i|
    name, type, desc, value, options = p
    next unless provided_inputs.include?(name)
    value = provided_inputs[name]

    Task.save_input(directory, name, type, value)
  end
end

#SOPT_strObject



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/scout/workflow/usage.rb', line 102

def SOPT_str
  sopt_options = []
  self.recursive_inputs.each do |name,type,desc,default,options|
    shortcut = (options && options[:shortcut]) || name.to_s.slice(0,1)
    boolean = type == :boolean

    sopt_options << "-#{shortcut}--#{name}#{boolean ? "" : "*"}"
  end

  sopt_options * ":"
end

#usage(workflow = nil, deps = nil) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/scout/workflow/usage.rb', line 4

def usage(workflow = nil, deps = nil)
  str = StringIO.new

  if description
    title, paragraph = description.split("\n\n")
    if title.length < Misc::MAX_TTY_LINE_WIDTH
      title = self.name.to_s + " - " + title
      str.puts Log.color :yellow, title
      str.puts Log.color :yellow, "-" * title.length
      if paragraph
        str.puts "\n" << Misc.format_paragraph(paragraph) 
      end
    else
      title = self.name.to_s
      str.puts Log.color :yellow, title
      str.puts Log.color :yellow, "-" * title.length
      str.puts "\n" << Misc.format_paragraph(description) 
    end
  else
    title = self.name.to_s
    str.puts Log.color :yellow, title
    str.puts Log.color :yellow, "-" * title.length
  end


  selects = []

  if inputs && inputs.any?
    str.puts
    str.puts Log.color(:magenta, "Inputs")
    str.puts
    str.puts SOPT.input_array_doc(inputs)

    inputs.select{|name,type, _| type == :select }.each do |name,_,_,_,options|
      selects << [name, options[:select_options]] if options[:select_options]
    end
  end

  deps = workflow ? workflow.recursive_deps(self.name) : self.deps if deps.nil?
  if deps and deps.any?
    seen = inputs.collect{|name,_| name }
    dep_inputs = {}
    deps.each do |dep_workflow,task_name,options|
      next if task_name.nil?
      task = dep_workflow.tasks[task_name]

      next if task.inputs.nil?

      inputs = task.inputs.reject{|name, _| seen.include? name }
      inputs = task.inputs.reject{|name, _| options.include? name }
      next unless inputs.any?
      input_names = inputs.collect{|name,_| name }
      task.inputs.select{|name,_| input_names.include? name }.each do |name,_,_,_,options|
        selects << [name, options[:select_options]] if options && options[:select_options]
      end

      dep = workflow.nil? || dep_workflow.name != workflow.name ? ["#{dep_workflow.name}", task_name.to_s] *"#" : task_name.to_s
      dep_inputs[dep] = inputs
    end

    str.puts
    str.puts Log.color(:magenta, "Inputs from dependencies:") if dep_inputs.any?
    dep_inputs.each do |dep,inputs|
      str.puts
      str.puts Log.color :yellow, dep + ":"
      str.puts
      str.puts SOPT.input_array_doc(inputs)
    end
  end

  case
  when inputs && inputs.select{|name,type| type == :array }.any?
    str.puts
    str.puts Log.color(:green, Misc.format_paragraph("Lists are specified as arguments using ',' or '|'. When specified as files the '\\n'
    also works in addition to the others. You may use the '--array_separator' option
    the change this default. Whenever a file is specified it may also accept STDIN using
    the '-' character."))

  when inputs && inputs.select{|name,type| type == :file || type == :tsv }.any?
    str.puts
    str.puts Log.color(:green, Misc.format_paragraph("Whenever a file is specified it may also accept STDIN using the '-' character."))
  end

  str.puts
  str.puts Log.color(:magenta, "Returns: ") << Log.color(:blue, type.to_s) << "\n"

  if selects.any?
    str.puts
    str.puts Log.color(:magenta, "Input select options")
    selects.collect{|p| p}.uniq.each do |input,options|
      str.puts 
      str.puts Log.color(:blue, input.to_s + ": ") << Misc.format_paragraph(options.collect{|o| Array === o ? o.first.to_s : o.to_s} * ", ") << "\n"
    end
  end
  str.rewind
  str.read
end