Class: Step

Inherits:
Object
  • Object
show all
Defined in:
lib/scout/workflow/step.rb,
lib/scout/workflow/step/file.rb,
lib/scout/workflow/step/info.rb,
lib/scout/workflow/step/load.rb,
lib/scout/workflow/step/config.rb,
lib/scout/workflow/step/inputs.rb,
lib/scout/workflow/step/status.rb,
lib/scout/workflow/step/archive.rb,
lib/scout/workflow/step/children.rb,
lib/scout/workflow/step/progress.rb,
lib/scout/workflow/step/provenance.rb,
lib/scout/workflow/step/dependencies.rb

Constant Summary collapse

SERIALIZER =
:marshal

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context = nil, &task) ⇒ Step

Returns a new instance of Step.



19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/scout/workflow/step.rb', line 19

def initialize(path = nil, inputs = nil, dependencies = nil, id = nil, non_default_inputs = nil, provided_inputs = nil, compute = nil, exec_context = nil, &task)
  @path = path
  @inputs = inputs
  @dependencies = dependencies
  @id = id
  @non_default_inputs = non_default_inputs
  @provided_inputs = provided_inputs
  @compute = compute 
  @task = task
  @mutex = Mutex.new
  @tee_copies = 1
  @exec_context = exec_context || self
end

Instance Attribute Details

#computeObject

Returns the value of attribute compute.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def compute
  @compute
end

#dependenciesObject

Returns the value of attribute dependencies.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def dependencies
  @dependencies
end

#exec_contextObject

Returns the value of attribute exec_context.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def exec_context
  @exec_context
end

#idObject

Returns the value of attribute id.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def id
  @id
end

#inputsObject

Returns the value of attribute inputs.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def inputs
  @inputs
end

#non_default_inputsObject

Returns the value of attribute non_default_inputs.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def non_default_inputs
  @non_default_inputs
end

#overridenObject

Returns the value of attribute overriden.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def overriden
  @overriden
end

#overriden_taskObject

Returns the value of attribute overriden_task.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def overriden_task
  @overriden_task
end

#overriden_workflowObject

Returns the value of attribute overriden_workflow.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def overriden_workflow
  @overriden_workflow
end

#pathObject

Returns the value of attribute path.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def path
  @path
end

#provided_inputsObject

Returns the value of attribute provided_inputs.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def provided_inputs
  @provided_inputs
end

#resultObject (readonly)

Returns the value of attribute result.



139
140
141
# File 'lib/scout/workflow/step.rb', line 139

def result
  @result
end

#taskObject

Returns the value of attribute task.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def task
  @task
end

#tee_copiesObject

Returns the value of attribute tee_copies.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def tee_copies
  @tee_copies
end

#typeObject

Returns the value of attribute type.



62
63
64
# File 'lib/scout/workflow/step.rb', line 62

def type
  @type
end

#workflowObject

Returns the value of attribute workflow.



18
19
20
# File 'lib/scout/workflow/step.rb', line 18

def workflow
  @workflow
end

Class Method Details

._load(path) ⇒ Object



180
181
182
# File 'lib/scout/workflow/step/info.rb', line 180

def self._load(path)
  Step.new path
end

.clean(file) ⇒ Object



46
47
48
# File 'lib/scout/workflow/step/status.rb', line 46

def self.clean(file)
  Step.new(file).clean
end

.job_path?(path) ⇒ Boolean

Returns:

  • (Boolean)


2
3
4
# File 'lib/scout/workflow/step/provenance.rb', line 2

def self.job_path?(path)
  path.split("/")[-4] == "jobs"
end

.load(path) ⇒ Object



13
14
15
16
17
# File 'lib/scout/workflow/step/load.rb', line 13

def self.load(path)
  path = relocate(path) unless Open.exists?(path)
  #raise "Could not load #{path}" unless Open.exists?(path)
  s = Step.new path
end

.load_info(info_file) ⇒ Object



12
13
14
15
16
17
18
19
# File 'lib/scout/workflow/step/info.rb', line 12

def self.load_info(info_file)
  info = begin
           Persist.load(info_file, SERIALIZER) || {}
         rescue
           {status: :noinfo}
         end
  IndiferentHash.setup(info)
end

.prov_indent(step, offset, input_dependencies) ⇒ Object



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/scout/workflow/step/provenance.rb', line 89

def self.prov_indent(step, offset, input_dependencies)
  return (" " * (offset))
  if step.alias?
    (" " * offset + "🡵")
  elsif step.overriden_task
    (" " * offset + "🡇")
  elsif input_dependencies.include?(step)
    (" " * offset + "")
  elsif step.input_dependencies.any?
    (" " * offset + "")
  else
    (" " * (offset+1))
  end
end

.prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil, input_dependencies = nil) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/scout/workflow/step/provenance.rb', line 104

def self.prov_report(step, offset = 0, task = nil, seen = [], expand_repeats = false, input = nil, input_dependencies = nil)
  info = step.info  || {}
  info[:task_name] = task
  path  = step.path
  status = info[:status] || :missing
  status = status.to_sym if String === status
  status = :noinfo if status == :missing && Open.exist?(path)
  status = "remote" if Open.remote?(path) || Open.ssh?(path)
  name = info[:name] || File.basename(path)
  status = :unsync if status == :done and not Open.exist?(path)
  status = :notfound if status == :noinfo and not Open.exist?(path)


  this_step_msg = prov_report_msg(status, name, path, info, input)

  input_dependencies ||= {}
  step.dependencies.each do |dep|
    if dep.input_dependencies.any?
      dep.input_dependencies.each do |id|
        input_name, _dep = dep.recursive_inputs.select{|f,d| 
          d == id || (String === d && d.start_with?(id.files_dir)) || (Array === d && d.include?(id))
        }.keys.last
        if input_name
          input_dependencies[id] ||= []
          input_dependencies[id] << [dep, input_name]
        end
      end
    end
  end if step.dependencies

  str = []

  indent = prov_indent(step, offset, input_dependencies)
  str << indent + this_step_msg if ENV["SCOUT_ORIGINAL_STACK"] == 'true'

  step.dependencies.dup.tap{|l| 
    l.reverse! if ENV["SCOUT_ORIGINAL_STACK"] == 'true'
  }.each do |dep|
    path = dep.path
    new = ! seen.include?(path)
    if new
      seen << path
      str.concat(prov_report(dep, offset + 1, task, seen, expand_repeats, input_dependencies[dep], input_dependencies.dup).split("\n"))
    else
      if expand_repeats
        str << Log.color(Step.status_color(dep.status), Log.uncolor(prov_report(dep, offset+1, task, input_dependencies[dep], input_dependencies.dup)))
      else
        info = dep.info  || {}
        status = info[:status] || :missing
        status = "remote" if Open.remote?(path) || Open.ssh?(path)
        name = info[:name] || File.basename(path)
        status = :unsync if status == :done and not Open.exist?(path)
        status = :notfound if status == :noinfo and not Open.exist?(path)

        dep_indent = prov_indent(dep, offset+1, input_dependencies)
        str << dep_indent + Log.color(Step.status_color(status), Log.uncolor(prov_report_msg(status, name, path, info, input_dependencies[dep])))
      end
    end
  end if step.dependencies

  str << indent + this_step_msg unless ENV["SCOUT_ORIGINAL_STACK"] == 'true'

  str * "\n"
end

.prov_report_msg(status, name, path, info, input = nil) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/scout/workflow/step/provenance.rb', line 32

def self.prov_report_msg(status, name, path, info, input = nil)
  parts = path.sub(/\{.*/,'').split "/"

  parts.pop

  task = Log.color(:yellow, parts.pop)
  workflow = Log.color(:magenta, parts.pop)

  if ! Step.job_path?(path)
    task, status, workflow = Log.color(:yellow, info[:task_name]), Log.color(:green, "file"), Log.color(:magenta, "-")
  end

  path_mtime = begin
                 Open.mtime(path)
               rescue Exception
                 nil
               end

  if input.nil? || input.empty?
    input_str = nil
  else
    input = input.reject{|dep,name| (input & dep.dependencies.collect{|d| [d,name]}).any? }
    input = input.reject{|dep,name| (input & dep.input_dependencies.collect{|d| [d,name]}).any? }
    input_str = Log.color(:magenta, "-> ") + input.collect{|dep,name| Log.color(:yellow, dep.task_name.to_s) + ":" + Log.color(:yellow, name) }.uniq * " "
  end

  str = if ! (Open.remote?(path) || Open.ssh?(path)) && (Open.exists?(path) && $main_mtime && path_mtime && ($main_mtime - path_mtime) < -2)
          prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " << " (#{Log.color(:red, "Mtime out of sync") })"
        else
          prov_status_msg(status.to_s) << " " << [workflow, task, path, input_str].compact * " " 
        end

  if $inputs and $inputs.any? 
    job_inputs = Workflow.load_step(path).recursive_inputs.to_hash
    IndiferentHash.setup(job_inputs)

    $inputs.each do |input|
      value = job_inputs[input]
      next if  value.nil?
      value_str = Log.fingerprint(value)
      str << "\t#{Log.color :magenta, input}=#{value_str}"
    end
  end

  if $info_fields and $info_fields.any?
    $info_fields.each do |field|
      IndiferentHash.setup(info)
      value = info[field]
      next if value.nil?
      value_str = Log.fingerprint(value)
      str << "\t#{Log.color :magenta, field}=#{value_str}"
    end
  end

  str
end

.prov_status_msg(status) ⇒ Object



27
28
29
30
# File 'lib/scout/workflow/step/provenance.rb', line 27

def self.prov_status_msg(status)
  color = status_color(status)
  Log.color(color, status.to_s)
end

.relocate(path) ⇒ Object



2
3
4
5
6
7
8
9
10
11
# File 'lib/scout/workflow/step/load.rb', line 2

def self.relocate(path)
  return path if Open.exists?(path)
  Path.setup(path) unless Path === path
  relocated = path.relocate
  return relocated if Open.exists?(relocated)
  subpath = path.split("/")[-3..-1] * "/"
  relocated = Path.setup("var/jobs")[subpath]
  return relocated if Open.exists?(relocated)
  path
end

.status_color(status) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/scout/workflow/step/provenance.rb', line 6

def self.status_color(status)
  case status.to_sym
  when :error, :aborted, :dead, :unsync
    :red
  when :streaming, :started
    :cyan
  when :done, :noinfo
    :green
  when :dependencies, :waiting, :setup
    :yellow
  when :notfound, :cleaned, :missing
    :blue
  else
    if status.to_s.index ">"
      :cyan
    else
      :cyan
    end
  end
end

.wait_for_jobs(jobs) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/scout/workflow/step/dependencies.rb', line 98

def self.wait_for_jobs(jobs)
  threads = []
  jobs.each do |job|
    threads << Thread.new do
      Thread.current.report_on_exception = false
      job.join
    end
  end
  threads.each do |t|
    t.join
  end
end

Instance Method Details

#_dump(level) ⇒ Object

Marshal Step



176
177
178
# File 'lib/scout/workflow/step/info.rb', line 176

def _dump(level)
  @path
end

#abort(exception = nil) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/scout/workflow/step/status.rb', line 2

def abort(exception = nil)
  if (pid = info[:pid]) && pid != Process.pid && Misc.pid_alive?(pid)
    Log.debug "Kill process #{pid} to abort step #{Log.fingerprint self}"
    begin
      s = Misc.abort_child pid, true
      Log.medium "Aborted pid #{path} #{s}"
    rescue 
      Log.debug("Aborted job #{pid} was not killed: #{$!.message}")
    end
  else
    while @result && streaming? && stream = self.stream
      stream.abort(exception)
    end
    @take_stream.abort(exception) if streaming?
  end
end

#abort_dependenciesObject



94
95
96
# File 'lib/scout/workflow/step/dependencies.rb', line 94

def abort_dependencies
  all_dependencies.each{|dep| dep.abort if dep.running? } 
end

#aborted?Boolean

Returns:

  • (Boolean)


154
155
156
# File 'lib/scout/workflow/step/info.rb', line 154

def aborted?
  status == :aborted || status == 'aborted'
end

#alias?Boolean

Returns:

  • (Boolean)


379
380
381
# File 'lib/scout/workflow/step.rb', line 379

def alias?
  task.alias? if task
end

#all_dependenciesObject



63
64
65
66
67
68
69
70
# File 'lib/scout/workflow/step/dependencies.rb', line 63

def all_dependencies
  @all_dependencies ||= begin
                          all_dependencies = []
                          all_dependencies += dependencies if dependencies
                          all_dependencies += input_dependencies if input_dependencies
                          all_dependencies
                        end
end

#archive_deps(jobs = nil) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/scout/workflow/step/archive.rb', line 29

def archive_deps(jobs = nil)
  jobs = dependencies if jobs.nil?

  archived_info = jobs.inject({}) do |acc,dep|
    next acc unless Open.exists?(dep.info_file)
    acc[dep.path] = dep.info
    acc.merge!(dep.archived_info)
    acc
  end

  self.set_info :archived_info, archived_info
  self.set_info :archived_dependencies, info[:dependencies]
end

#archived_infoObject



2
3
4
5
# File 'lib/scout/workflow/step/archive.rb', line 2

def archived_info
  return {} unless Open.exists?(info_file)
  info[:archived_info] || {}
end

#archived_inputsObject



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/scout/workflow/step/archive.rb', line 7

def archived_inputs
  return [] unless info[:archived_dependencies]
  archived_info = self.archived_info

  all_inputs = NamedArray.setup([],[])
  deps = info[:archived_dependencies].dup
  seen = []
  while path = deps.pop
    dep_info = archived_info[path]
    if Hash === dep_info
      dep_inputs = dep_info[:inputs]
      NamedArray.setup(dep_inputs, dep_info[:input_names])
      all_inputs.concat(dep_inputs)
      deps.concat(dep_info[:dependencies].collect{|p| p.last } - seen) if dep_info[:dependencies]
      deps.concat(dep_info[:archived_dependencies].collect{|p| p.last } - seen) if dep_info[:archived_dependencies]
    end
    seen << path
  end

  all_inputs
end

#bundle_filesObject



28
29
30
# File 'lib/scout/workflow/step/file.rb', line 28

def bundle_files
  [path, info_file, Dir.glob(File.join(files_dir,"**/*"))].flatten.select{|f| Open.exist?(f) }
end

#canfail?Boolean

Returns:

  • (Boolean)


58
59
60
# File 'lib/scout/workflow/step/status.rb', line 58

def canfail?
  @compute && @compute.include?(:canfail)
end

#child(&block) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
# File 'lib/scout/workflow/step/children.rb', line 2

def child(&block)
  child_pid = Process.fork &block
  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  set_info :children_pids, children_pids
  child_pid
end

#cleanObject



33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/scout/workflow/step/status.rb', line 33

def clean
  Log.debug "Cleaning job files: #{path}"
  @take_stream = nil 
  @result = nil
  @info = nil
  @info_load_time = nil
  Open.rm path if Open.exist_or_link?(path)
  Open.rm tmp_path if Open.exist_or_link?(tmp_path)
  Open.rm info_file if Open.exist_or_link?(info_file)
  Open.rm_rf files_dir if Open.exist_or_link?(files_dir)
  self
end

#clean_nameObject



75
76
77
78
79
80
81
82
# File 'lib/scout/workflow/step.rb', line 75

def clean_name
  return @id if @id
  return info[:clean_name] if info.include? :clean_name
  if m = name.match(/(.*?)(?:_[a-z0-9]{32})?(?:\..*)?/)
    return m[1] 
  end
  return name.split(".").first
end

#cmd(*args) ⇒ Object



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/scout/workflow/step/children.rb', line 14

def cmd(*args)
  all_args = *args

  all_args << {} unless Hash === all_args.last

  level = all_args.last[:log] || 0
  level = 0 if TrueClass === level
  level = 10 if FalseClass === level
  level = level.to_i

  all_args.last[:log] = true
  all_args.last[:pipe] = true

  io = CMD.cmd(*all_args)
  child_pid = io.pids.first

  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  set_info :children_pids, children_pids

  while c = io.getc
    STDERR << c if Log.severity <= level
    if c == "\n"
      Log.logn "STDOUT [#{child_pid}]: ", level
    end
  end 

  io.join

  nil
end

#config(key, *tokens) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/scout/workflow/step/config.rb', line 4

def config(key, *tokens)
  options = tokens.pop if Hash === tokens.last
  options ||= {}

  new_tokens = []
  if workflow
    workflow_name = workflow.name
    new_tokens << ("workflow:" << workflow_name)
    new_tokens << ("task:" << workflow_name << "#" << task_name.to_s)
  end
  new_tokens << ("task:" << task_name.to_s)

  Scout::Config.get(key, tokens + new_tokens, options)
end

#consume_all_streamsObject



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# File 'lib/scout/workflow/step.rb', line 284

def consume_all_streams
  threads = [] 
  while @result && streaming? && stream = self.stream
    threads << Open.consume_stream(stream, true)
  end

  threads.compact!

  threads.each do |t| 
    begin
      t.join 
    rescue Exception
      threads.compact.each{|t| t.raise(Aborted); t.join }
      raise $!
    end
  end
end

#copy_linked_files_dirObject



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/scout/workflow/step/file.rb', line 32

def copy_linked_files_dir
  if File.symlink?(self.files_dir)
    begin
      realpath = Open.realpath(self.files_dir)
      Open.rm self.files_dir
      Open.cp realpath, self.files_dir
    rescue
      Log.warn "Copy files_dir for #{self.workflow_short_path} failed: " + $!.message
    end
  end
end

#digest_strObject



366
367
368
# File 'lib/scout/workflow/step.rb', line 366

def digest_str
  "Step: " + short_path
end

#dirty?Boolean

Returns:

  • (Boolean)


74
75
76
# File 'lib/scout/workflow/step/status.rb', line 74

def dirty?
  done? && ! updated?
end

#done?Boolean

Returns:

  • (Boolean)


249
250
251
# File 'lib/scout/workflow/step.rb', line 249

def done?
  Open.exist?(path)
end

#error?Boolean

Returns:

  • (Boolean)


150
151
152
# File 'lib/scout/workflow/step/info.rb', line 150

def error?
  status == :error || status == 'error'
end

#exceptionObject



171
172
173
# File 'lib/scout/workflow/step/info.rb', line 171

def exception
  info[:exception]
end

#execObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/scout/workflow/step.rb', line 102

def exec

  if inputs 
    if Task === task
      types = task.inputs.collect{|name,type| type }
      new_inputs = inputs.zip(types).collect{|input,info|  
        type, desc, default, options = info
        next input unless Step === input
        input.join if input.streaming?
        Task.format_input(input.join.path, type, options)
      }
    else
      new_inputs = inputs.collect{|input|  
        Step === input ? input.load : input
      }
    end
    inputs = new_inputs
  end

  @result = begin
              @in_exec = true
              @exec_context.instance_exec(*inputs, &task)
            ensure
              @in_exec = false
            end
end

#file(file = nil) ⇒ Object



15
16
17
18
19
20
# File 'lib/scout/workflow/step/file.rb', line 15

def file(file = nil)
  dir = files_dir
  Path.setup(dir) unless Path === dir
  return dir if file.nil?
  dir[file]
end

#filesObject



22
23
24
25
26
# File 'lib/scout/workflow/step/file.rb', line 22

def files
  Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path }.collect do |path| 
    Misc.path_relative_to(files_dir, path) 
  end
end

#files_dirObject



2
3
4
5
6
7
8
9
10
11
12
13
# File 'lib/scout/workflow/step/file.rb', line 2

def files_dir
  @files_dir ||= begin
                   dir = @path + ".files"
                   if Path === @path
                     @path.annotate(dir)
                   else
                     Path.setup(dir)
                   end
                   dir.pkgdir = self
                   dir
                 end
end

#fingerprintObject



370
371
372
# File 'lib/scout/workflow/step.rb', line 370

def fingerprint
  digest_str
end

#fork(noload = false, semaphore = nil) ⇒ Object



229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# File 'lib/scout/workflow/step.rb', line 229

def fork(noload = false, semaphore = nil)
  pid = Process.fork do
    Signal.trap(:TERM) do
      raise Aborted, "Recieved TERM Signal on forked process #{Process.pid}"
    end
    reset_info status: :queue, pid: Process.pid unless present?
    if semaphore
      ScoutSemaphore.synchronize(semaphore) do
        run(noload)
      end
    else
      run(noload)
    end
    join
  end
  Process.detach pid
  grace
  self
end

#full_task_nameObject



96
97
98
99
100
# File 'lib/scout/workflow/step.rb', line 96

def full_task_name
  return nil if task_name.nil?
  return task_name.to_s if workflow.nil?
  [workflow, task_name] * "#"
end

#graceObject



308
309
310
311
312
313
# File 'lib/scout/workflow/step.rb', line 308

def grace
  while ! present?
    sleep 0.1
  end
  self
end

#infoObject



39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/scout/workflow/step/info.rb', line 39

def info
  outdated = begin
               @info_load_time && (mtime = Open.mtime(info_file)) && mtime > @info_load_time
             rescue
               true
             end

  if @info.nil? || outdated
    load_info
  end

  @info
end

#info_fileObject



3
4
5
6
7
8
9
10
# File 'lib/scout/workflow/step/info.rb', line 3

def info_file
  return nil if @path.nil?
  @info_file ||= begin
                   info_file = @path + ".info"
                   @path.annotate info_file if Path === @path
                   info_file
                 end
end

#init_infoObject



35
36
37
# File 'lib/scout/workflow/step/info.rb', line 35

def init_info
  log :waiting unless info_file.nil? || Open.exists?(info_file)
end

#input_dependenciesObject



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/scout/workflow/step/dependencies.rb', line 21

def input_dependencies
  return [] unless inputs
  inputs.collect do |d|
    if Step === d
      d
    elsif (Path === d) && (Step === d.pkgdir)
      d.pkgdir
    else
      nil
    end
  end.compact.uniq
end

#joinObject



319
320
321
322
323
324
325
326
327
328
329
330
331
332
# File 'lib/scout/workflow/step.rb', line 319

def join
  consume_all_streams
  while @result.nil? && (present? && ! (terminated? || done?))
    sleep 0.1
  end

  Misc.wait_child info[:pid] if info[:pid]

  raise self.exception if self.exception

  raise "Error in job #{self.path}" if self.error? or self.aborted? 

  self
end

#loadObject



345
346
347
348
349
# File 'lib/scout/workflow/step.rb', line 345

def load
  return @result unless @result.nil? || streaming?
  join
  done? ? Persist.load(path, type) : exec
end

#load_infoObject



21
22
23
24
# File 'lib/scout/workflow/step/info.rb', line 21

def load_info
  @info = Step.load_info(info_file)
  @info_load_time = Time.now
end

#log(status, message = nil, &block) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/scout/workflow/step/info.rb', line 128

def log(status, message = nil, &block)
  if block_given?
    time = Misc.exec_time &block
    time_str = Misc.format_seconds_short time
    message = message.nil? ? Log.color(:time, time_str) : "#{Log.color :time, time_str} - #{ message }"
  end

  if message
    merge_info :status => status, :message => message
  else
    merge_info :status => status
  end
end

#marshal_load(path) ⇒ Object



184
185
186
# File 'lib/scout/workflow/step/info.rb', line 184

def marshal_load(path)
  Step.new path
end

#merge_info(new_info) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/scout/workflow/step/info.rb', line 53

def merge_info(new_info)
  info = self.info
  new_info.each do |key,value|
    value = Annotation.purge(value)
    if key == :status
      message = new_info[:message]
      if message.nil? && (value == :done || value == :error || value == :aborted)
        issued = info[:issued]
        start = info[:start]
        eend = new_info[:end]
        if start && eend
          time = eend - start
          total_time = eend - issued
          if total_time - time > 1
            time_str = "#{Misc.format_seconds_short(time)} (#{Misc.format_seconds_short(total_time)})"
          else
            time_str = Misc.format_seconds_short(time)
          end
          info[:time_elapsed] = time
          info[:total_time_elapsed] = total_time
          message = Log.color(:time, time_str)
        end
      end
      report_status value, message 
    end

    if key == :message
      messages = info[:messages] || []
      messages << value
      info[:messages] = messages
      next
    end

    if Exception === value
      begin
        Marshal.dump(value)
      rescue TypeError
        if ScoutException === value
          new = ScoutException.new value.message
        else
          new = Exception.new value.message
        end
        new.set_backtrace(value.backtrace)
        value = new
      end
    end

    if info.include?(key)
      case info[key]
      when Array
        info[key].concat Array === value ? value : [value]
      when Hash
        info[key].merge! value
      else
        info[key] = value
      end
    else
      info[key] = value
    end
  end
  save_info(info)
end

#messagesObject



142
143
144
# File 'lib/scout/workflow/step/info.rb', line 142

def messages
  info[:messages]
end

#nameObject



67
68
69
# File 'lib/scout/workflow/step.rb', line 67

def name
  @name ||= File.basename(@path)
end

#overriden?Boolean

Returns:

  • (Boolean)


162
163
164
165
# File 'lib/scout/workflow/step/info.rb', line 162

def overriden?
  @overriden = overriden_task || overriden_workflow || overriden_deps.any? if @overriden.nil?
  @overriden
end

#overriden_depsObject



167
168
169
# File 'lib/scout/workflow/step/info.rb', line 167

def overriden_deps
  rec_dependencies.select{|d| d.overriden? }
end

#prepare_dependenciesObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/scout/workflow/step/dependencies.rb', line 34

def prepare_dependencies
  inverse_dep = {}

  dependencies.each do |dep|
    if dep.present? && ! dep.updated?
      Log.medium "Clean outdated #{dep.path}"
      dep.clean
    end

    next if dep.done?

    if dep.dependencies
      dep.dependencies.each do |d|
        inverse_dep[d] ||= []
        inverse_dep[d] << dep
      end
    end

    input_dependencies.each do |d|
      inverse_dep[d] ||= []
      inverse_dep[d] << dep
    end
  end if dependencies

  inverse_dep.each do |dep,list|
    dep.tee_copies = list.length
  end
end

#present?Boolean

Returns:

  • (Boolean)


302
303
304
305
306
# File 'lib/scout/workflow/step.rb', line 302

def present?
  Open.exist?(path) ||
    Open.exist?(info_file) ||
    Open.exist?(files_dir)
end

#produce(with_fork: false) ⇒ Object



334
335
336
337
338
339
340
341
342
343
# File 'lib/scout/workflow/step.rb', line 334

def produce(with_fork: false)
  clean if error? && recoverable_error?
  if with_fork
    self.fork
    self.join
  else
    run(:no_load)
  end
  self
end

#progress_bar(msg = "Progress", options = nil, &block) ⇒ Object



2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/scout/workflow/step/progress.rb', line 2

def progress_bar(msg = "Progress", options = nil, &block)
  if Hash === msg and options.nil?
    options = msg
    msg = nil
  end
  options = {} if options.nil?

  max = options[:max]
  Open.mkdir files_dir
  bar = Log::ProgressBar.new_bar(max, {:desc => msg, :file => (@exec ? nil : file(:progress))}.merge(options))

  if block_given?
    bar.init
    res = yield bar
    bar.remove
    res
  else
    bar
  end
end

#rec_dependencies(connected = false, seen = []) ⇒ Object



2
3
4
5
6
7
8
9
10
11
# File 'lib/scout/workflow/step/dependencies.rb', line 2

def rec_dependencies(connected = false, seen = [])
  direct_deps = []
  dependencies.each do |dep|
    next if seen.include? dep.path
    next if connected && dep.done? && dep.updated?
    direct_deps << dep
  end if dependencies
  seen.concat direct_deps.collect{|d| d.path }
  direct_deps.inject(direct_deps){|acc,d| acc.concat(d.rec_dependencies(connected, seen)); acc }
end

#recoverable_error?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/scout/workflow/step/status.rb', line 19

def recoverable_error?
  self.error? && ! (ScoutException === self.exception)
end

#recursive_cleanObject



51
52
53
54
55
56
# File 'lib/scout/workflow/step/status.rb', line 51

def recursive_clean
  dependencies.each do |dep|
    dep.recursive_clean
  end
  clean
end

#recursive_inputsObject



13
14
15
16
17
18
19
# File 'lib/scout/workflow/step/dependencies.rb', line 13

def recursive_inputs
  recursive_inputs = NamedArray === inputs ? inputs.to_hash : {}
  return recursive_inputs if dependencies.nil?
  dependencies.inject(recursive_inputs) do |acc,dep|
    acc.merge(dep.recursive_inputs)
  end
end

#report_status(status, message = nil) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/scout/workflow/step/info.rb', line 120

def report_status(status, message = nil)
  if message.nil?
    Log.info [Log.color(:status, status, true), Log.color(:task, task_name, true), Log.color(:path, path)] * " "
  else
    Log.info [Log.color(:status, status, true), Log.color(:task, task_name, true), message, Log.color(:path, path)] * " "
  end
end

#reset_info(info = {}) ⇒ Object



31
32
33
# File 'lib/scout/workflow/step/info.rb', line 31

def reset_info(info = {})
  save_info(@info = info)
end

#run(stream = false) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/scout/workflow/step.rb', line 140

def run(stream = false)
  return @result || self.load if done?
  prepare_dependencies
  begin

    case stream
    when TrueClass, :stream
      no_load = :stream
    when :no_load
      no_load = true
    else
      no_load = false
    end

    @result = Persist.persist(name, type, :path => path, :tee_copies => tee_copies, no_load: no_load) do
      input_names = (task.respond_to?(:inputs) && task.inputs) ? task.inputs.collect{|name,_| name} : []


      reset_info :status => :setup, :issued => Time.now,
        :pid => Process.pid, :pid_hostname => Misc.hostname, 
        :task_name => task_name, :workflow => workflow.to_s,
        :inputs => Annotation.purge(inputs), :input_names => input_names, :type => type,
        :dependencies => (dependencies || []) .collect{|d| d.path }

      run_dependencies

      set_info :start, Time.now
      log :start
      @exec_result = exec

      if @exec_result.nil? && File.exist?(self.tmp_path) && ! File.exist?(self.path)
        Open.mv self.tmp_path, self.path
      else
        @exec_result = @exec_result.stream if @exec_result.respond_to?(:stream) && ! (TSV === @exec_result)
      end

      @exec_result

      if (IO === @exec_result || StringIO === @exec_result) && (ENV["SCOUT_NO_STREAM"] == "true" || ! stream)
        Open.sensible_write(self.path, @exec_result)
        @exec_result = nil
      else
        @exec_result
      end
    end

    if TrueClass === no_load
      consume_all_streams if streaming?
      @result = nil
    elsif no_load && ! (IO === @result)
      @result = nil
    end

    @result
  rescue Exception => e
    merge_info :status => :error, :exception => e, :end => Time.now, :backtrace => e.backtrace, :message => "#{e.class}: #{e.message}"
    begin
      abort_dependencies
    ensure
      raise e
    end
  ensure
    if ! (error? || aborted?)
      if @result && streaming?
        ConcurrentStream.setup(@result) do
          merge_info :status => :done, :end => Time.now
        end

        @result.abort_callback = proc do |exception|
          if exception.nil? || Aborted === exception || Interrupt === exception
            merge_info :status => :aborted, :end => Time.now
          else
            begin
              merge_info :status => :error, :exception => exception, :end => Time.now
            rescue Exception
              Log.exception $!
            end
          end
        end


        log :streaming
      else
        merge_info :status => :done, :end => Time.now
      end
    end
  end
end

#run_dependenciesObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/scout/workflow/step/dependencies.rb', line 72

def run_dependencies
  all_dependencies.each do |dep| 
    next if dep.running? || dep.done?
    compute_options = compute[dep.path] if compute
    compute_options = [] if compute_options.nil?

    stream = compute_options.include?(:stream)
    stream = true unless ENV["SCOUT_EXPLICIT_STREAMING"] == 'true'
    stream = :no_load if compute_options.include?(:produce)

    begin
      dep.run(stream)
    rescue ScoutException
      if compute_options.include?(:canfail)
        Log.medium "Allow failing of #{dep.path}"
      else
        raise $!
      end
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


158
159
160
# File 'lib/scout/workflow/step/info.rb', line 158

def running?
  ! (done? && status == :done) && (info[:pid] && Misc.pid_alive?(info[:pid]))
end

#save_info(info = nil) ⇒ Object



26
27
28
29
# File 'lib/scout/workflow/step/info.rb', line 26

def save_info(info = nil)
  Persist.save(info, info_file, SERIALIZER)
  @info_load_time = Time.now
end

#save_inputs(inputs_dir) ⇒ Object



2
3
4
# File 'lib/scout/workflow/step/inputs.rb', line 2

def save_inputs(inputs_dir)
  self.task.save_inputs(inputs_dir, provided_inputs)
end

#set_info(key, value) ⇒ Object



116
117
118
# File 'lib/scout/workflow/step/info.rb', line 116

def set_info(key, value)
  merge_info(key => value)
end

#short_pathObject



71
72
73
# File 'lib/scout/workflow/step.rb', line 71

def short_path
  [workflow.to_s, task_name, name] * "/"
end

#started?Boolean

Returns:

  • (Boolean)


62
63
64
65
66
67
68
# File 'lib/scout/workflow/step/status.rb', line 62

def started?
  return true if done?
  return false unless Open.exist?(info_file)
  pid = info[:pid]
  return false unless pid
  return Misc.pid_alive?(pid)
end

#statusObject



146
147
148
# File 'lib/scout/workflow/step/info.rb', line 146

def status
  info[:status].tap{|s| s.nil? ? s : s.to_sym }
end

#step(task_name) ⇒ Object



351
352
353
354
355
356
357
358
359
360
# File 'lib/scout/workflow/step.rb', line 351

def step(task_name)
  task_name = task_name.to_sym
  dependencies.each do |dep|
    return dep if dep.task_name && dep.task_name.to_sym == task_name
    return dep if dep.overriden_task && dep.overriden_task.to_sym == task_name
    rec_dep = dep.step(task_name)
    return rec_dep if rec_dep
  end
  nil
end

#streamObject



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/scout/workflow/step.rb', line 257

def stream
  synchronize do
    if streaming? && ! @result.nil?
      if @result.next
        Log.debug "Taking result #{Log.fingerprint @result} next #{Log.fingerprint @result.next}"
      else
        Log.debug "Taking result #{Log.fingerprint @result}"
      end

      @take_stream, @result = @result, @result.next

      return @take_stream
    end
  end

  if done?
    Open.open(self.path)
  else
    if running? || waiting?
      join
      Open.open(self.path)
    else
      exec
    end
  end
end

#streaming?Boolean

Returns:

  • (Boolean)


253
254
255
# File 'lib/scout/workflow/step.rb', line 253

def streaming?
  @take_stream || IO === @result || StringIO === @result 
end

#synchronize(&block) ⇒ Object



33
34
35
# File 'lib/scout/workflow/step.rb', line 33

def synchronize(&block)
  @mutex.synchronize(&block)
end

#task_nameObject



84
85
86
87
88
# File 'lib/scout/workflow/step.rb', line 84

def task_name
  @task_name ||= @task.name if @task.respond_to?(:name)
  @task_name ||= info[:task_name] if Open.exist?(info_file)
  @task_name ||= path.split("/")[-2]
end

#task_signatureObject



374
375
376
377
# File 'lib/scout/workflow/step.rb', line 374

def task_signature
  workflow_name = String === workflow ? workflow : workflow.name
  [workflow, task_name] * "#"
end

#terminated?Boolean

Returns:

  • (Boolean)


315
316
317
# File 'lib/scout/workflow/step.rb', line 315

def terminated?
  ! @in_exec && (done? || error? || aborted?)
end

#tmp_pathObject



129
130
131
132
133
134
135
136
137
# File 'lib/scout/workflow/step.rb', line 129

def tmp_path
  @tmp_path ||= begin
                  basename = File.basename(@path)
                  dirname = File.dirname(@path)
                  tmp_path = File.join(dirname, '.' + basename)
                  @path.setup(tmp_path) if Path === @path
                  tmp_path
                end
end

#updated?Boolean

Returns:

  • (Boolean)


23
24
25
26
27
28
29
30
31
# File 'lib/scout/workflow/step/status.rb', line 23

def updated?
  return false if self.error? && self.recoverable_error?
  return true if self.done? && ! ENV["SCOUT_UPDATE"]
  newer = rec_dependencies.select{|dep| Path.newer?(self.path, dep.path) }
  newer += input_dependencies.select{|dep| Path.newer?(self.path, dep.path) }

  Log.low "Newer deps found for #{Log.fingerprint self}: #{Log.fingerprint newer}" if newer.any?
  newer.empty?
end

#waiting?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/scout/workflow/step/status.rb', line 70

def waiting?
  present? and not started?
end