Class: Step

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/workflow/step.rb,
lib/rbbt/workflow/accessor.rb,
lib/rbbt/workflow/step/run.rb

Direct Known Subclasses

WorkflowRESTClient::RemoteStep

Constant Summary collapse

INFO_SERIALIAZER =
Marshal
STREAM_CACHE =
{}
STREAM_CACHE_MUTEX =
Mutex.new

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil, clean_name = nil) ⇒ Step

Returns a new instance of Step.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/rbbt/workflow/step.rb', line 31

def initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil, clean_name = nil)
  path = Path.setup(Misc.sanitize_filename(path)) if String === path
  path = path.call if Proc === path

  @path = path
  @task = task
  @bindings = bindings
  @dependencies = case
                  when dependencies.nil? 
                    []
                  when Array === dependencies
                    dependencies
                  else
                    [dependencies]
                  end
  @mutex = Mutex.new
  @info_mutex = Mutex.new
  @inputs = inputs || []
  NamedArray.setup @inputs, task.inputs.collect{|s| s.to_s} if task and task.respond_to? :inputs and task.inputs
end

Class Attribute Details

.lock_dirObject

Returns the value of attribute lock_dir.



14
15
16
# File 'lib/rbbt/workflow/step.rb', line 14

def lock_dir
  @lock_dir
end

.log_relay_stepObject

Returns the value of attribute log_relay_step.



77
78
79
# File 'lib/rbbt/workflow/step.rb', line 77

def log_relay_step
  @log_relay_step
end

Instance Attribute Details

#bindingsObject

Returns the value of attribute bindings.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def bindings
  @bindings
end

#clean_nameObject

Returns the value of attribute clean_name.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def clean_name
  @clean_name
end

#dependenciesObject

Returns the value of attribute dependencies.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def dependencies
  @dependencies
end

#duppedObject (readonly)

Returns the value of attribute dupped.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def dupped
  @dupped
end

#exec(no_load = false) ⇒ Object

Returns the value of attribute exec.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def exec
  @exec
end

#inputsObject

Returns the value of attribute inputs.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def inputs
  @inputs
end

#mutexObject

Returns the value of attribute mutex.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def mutex
  @mutex
end

#pathObject

Returns the value of attribute path.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def path
  @path
end

#pidObject

Returns the value of attribute pid.



9
10
11
# File 'lib/rbbt/workflow/step.rb', line 9

def pid
  @pid
end

#resultObject

Returns the value of attribute result.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def result
  @result
end

#saved_streamObject (readonly)

Returns the value of attribute saved_stream.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def saved_stream
  @saved_stream
end

#seenObject

Returns the value of attribute seen.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def seen
  @seen
end

#streamObject (readonly)

Returns the value of attribute stream.



3
4
5
# File 'lib/rbbt/workflow/step/run.rb', line 3

def stream
  @stream
end

#taskObject

Returns the value of attribute task.



8
9
10
# File 'lib/rbbt/workflow/step.rb', line 8

def task
  @task
end

Class Method Details

.clean(path) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/rbbt/workflow/step.rb', line 161

def self.clean(path)
  info_file = Step.info_file path
  files_dir = Step.files_dir path
  if Open.exists?(path) or Open.exists?(info_file)
    begin
      self.abort if self.running?
    rescue Exception
    end

    @result = nil
    @pid = nil

    Misc.insist do
      Open.rm info_file if Open.exists? info_file
      Open.rm info_file + '.lock' if Open.exists? info_file + '.lock'
      Open.rm path if Open.exists? path
      Open.rm path + '.lock' if Open.exists? path + '.lock'
      Open.rm_rf files_dir if Open.exists? files_dir
    end
  end
end

.dup_stream(stream) ⇒ Object



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/rbbt/workflow/step/run.rb', line 7

def self.dup_stream(stream)
  case stream
  when IO, File
    return stream if stream.closed?

    STREAM_CACHE_MUTEX.synchronize do
      case current = STREAM_CACHE[stream]
      when nil
        Log.medium "Not duplicating stream #{ Misc.fingerprint(stream) }"
        STREAM_CACHE[stream] = stream
      when File
        if Open.exists? current.path 
          Log.medium "Reopening file #{ Misc.fingerprint(current) }"
          Open.open(current.path)
        else
          Log.medium "Duplicating file #{ Misc.fingerprint(current) } #{current.inspect}"
          Misc.dup_stream(current)
        end

      else
        Log.medium "Duplicating stream #{ Misc.fingerprint(stream) }"
        Misc.dup_stream(current)
      end
    end
  when TSV::Dumper#, TSV::Parser
    stream = stream.stream
    return stream if stream.closed?

    STREAM_CACHE_MUTEX.synchronize do
      if STREAM_CACHE[stream].nil?
        Log.high "Not duplicating dumper #{ stream.inspect }"
        STREAM_CACHE[stream] = stream
      else
        new = Misc.dup_stream(STREAM_CACHE[stream])
        Log.high "Duplicating dumper #{ stream.inspect } into #{new.inspect}"
        new
      end
    end
  else
    stream
  end
end

.files_dir(path) ⇒ Object



37
38
39
# File 'lib/rbbt/workflow/accessor.rb', line 37

def self.files_dir(path)
  path.nil? ? nil : path + '.files'
end

.info_file(path) ⇒ Object



41
42
43
# File 'lib/rbbt/workflow/accessor.rb', line 41

def self.info_file(path)
  path.nil? ? nil : path + '.info'
end

.job_name_for_info_file(info_file, extension = nil) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/rbbt/workflow/accessor.rb', line 56

def self.job_name_for_info_file(info_file, extension = nil)
  if extension and not extension.empty?
    info_file.sub(/\.#{extension}\.info$/,'')
  else
    info_file.sub(/\.info$/,'')
  end
end

.log(status, message, path, &block) ⇒ Object



249
250
251
252
253
254
255
256
257
258
259
# File 'lib/rbbt/workflow/accessor.rb', line 249

def self.log(status, message, path, &block)
  if block
    if Hash === message
      log_progress(status, message, path, &block)
    else
      log_block(status, message, path, &block)
    end
  else
    log_string(status, message, path)
  end
end

.log_block(status, message, path, &block) ⇒ Object



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/rbbt/workflow/accessor.rb', line 185

def self.log_block(status, message, path, &block)
  start = Time.now
  status = status.to_s
  status_color = self.status_color status

  Log.info do 
    now = Time.now
    str = Log.color :reset
    str << "#{ Log.color status_color, status}"
    str << ": #{ message }" if message
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
  res = yield
  eend = Time.now
  Log.info do 
    now = Time.now
    str = "#{ Log.color :cyan, status.to_s } +#{Log.color :green, "%.2f" % (eend - start)}"
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
  res
end

.log_progress(status, options = {}, path = nil, &block) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/rbbt/workflow/accessor.rb', line 226

def self.log_progress(status, options = {}, path = nil, &block)
  options = Misc.add_defaults options, :severity => Log::INFO, :file => path
  max = Misc.process_options options, :max
  Log::ProgressBar.with_bar(max, options) do |bar|
    begin
      res = yield bar
      raise KeepBar.new res if IO === res
      res
    rescue
      Log.exception $!
    end
  end
end

.log_string(status, message, path) ⇒ Object



211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/rbbt/workflow/accessor.rb', line 211

def self.log_string(status, message, path)
  Log.info do 

    status = status.to_s
    status_color = self.status_color status

    str = Log.color :reset
    str << "#{ Log.color status_color, status}"
    str << ": #{ message }" if message
    str << " -- #{Log.color :blue, path.to_s}" if path
    str << " #{Log.color :yellow, Process.pid}"
    str
  end
end

.purge_stream_cacheObject



50
51
52
53
54
55
56
57
58
59
60
# File 'lib/rbbt/workflow/step/run.rb', line 50

def self.purge_stream_cache
  return
  STREAM_CACHE_MUTEX.synchronize do
    STREAM_CACHE.collect{|k,s| 
      Thread.new do
        Misc.consume_stream s
      end
    }
    STREAM_CACHE.clear
  end
end

.started?Boolean

Returns:

  • (Boolean)


9
10
11
# File 'lib/rbbt/workflow/accessor.rb', line 9

def self.started?
  info_file.exists?
end

.status_color(status) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/rbbt/workflow/accessor.rb', line 171

def self.status_color(status)
  status = status.split(">").last
  case status
  when "starting"
    :yellow
  when "error", "aborted"
    :red
  when "done"
    :green
  else
    :cyan
  end
end

.step_info(path) ⇒ Object



45
46
47
48
49
50
51
52
53
54
# File 'lib/rbbt/workflow/accessor.rb', line 45

def self.step_info(path)
  begin
    Open.open(info_file(path)) do |f|
      INFO_SERIALIAZER.load(f)
    end
  rescue Exception
    Log.exception $!
    {}
  end
end

.wait_for_jobs(jobs) ⇒ Object



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/rbbt/workflow/accessor.rb', line 13

def self.wait_for_jobs(jobs)
  jobs = [jobs] if Step === jobs
  begin
    threads = []

    threads = jobs.collect do |j| 
      Thread.new do
        begin
          j.join 
        rescue Exception
          Log.error "Exception waiting for job: #{Log.color :blue, j.path}"
          raise $!
        end
      end
    end

    threads.each{|t| t.join }
  rescue Exception
    threads.each{|t| t.exit }
    jobs.each do |j| j.abort end
    raise $!
  end
end

Instance Method Details

#_abortObject



381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# File 'lib/rbbt/workflow/step/run.rb', line 381

def _abort
  return if @aborted
  @aborted = true
  return if done?
  Log.medium{"#{Log.color :red, "Aborting"} #{Log.color :blue, path}"}
  begin
    stop_dependencies
    abort_stream
    abort_pid
  rescue Aborted
    Log.medium{"#{Log.color :red, "Aborting ABORTED RETRY"} #{Log.color :blue, path}"}
    retry
  rescue Exception
    retry
  ensure
    if Open.exists? path
      Log.warn "Aborted job had finished. Removing result -- #{ path }"
      begin
        Open.rm path
      rescue Exception
        Log.warn "Exception removing result of aborted job: #{$!.message}"
      end
    end
  end
  Log.medium{"#{Log.color :red, "Aborted"} #{Log.color :blue, path}"}
end

#_execObject



83
84
85
86
# File 'lib/rbbt/workflow/step/run.rb', line 83

def _exec
  @exec = true if @exec.nil?
  @task.exec_in((bindings ? bindings : self), *@inputs)
end

#abortObject



408
409
410
411
# File 'lib/rbbt/workflow/step/run.rb', line 408

def abort
  _abort
  log(:aborted, "Job aborted") unless aborted? or error?
end

#abort_pidObject



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/rbbt/workflow/step/run.rb', line 342

def abort_pid
  @pid ||= info[:pid]

  case @pid
  when nil
    Log.medium "Could not abort #{path}: no pid"
    false
  when Process.pid
    Log.medium "Could not abort #{path}: same process"
    false
  else
    Log.medium "Aborting #{path}: #{ @pid }"
    begin
      Process.kill("KILL", @pid)
      Process.waitpid @pid
    rescue Exception
      Log.debug("Aborted job #{@pid} was not killed: #{$!.message}")
    end
    true
  end
end

#abort_streamObject



364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# File 'lib/rbbt/workflow/step/run.rb', line 364

def abort_stream
  stream = get_stream if @result
  stream ||= @saved_stream 
  @saved_stream = nil
  if stream and stream.respond_to? :abort and not stream.aborted?
    begin
      Log.medium "Aborting job stream #{stream.inspect} -- #{Log.color :blue, path}"
      stream.abort 
      #stream.close unless stream.closed?
    rescue Aborted
      Log.medium "Aborting job stream #{stream.inspect} ABORTED RETRY -- #{Log.color :blue, path}"
      Log.exception $!
      retry
    end
  end
end

#aborted?Boolean

Returns:

  • (Boolean)


305
306
307
# File 'lib/rbbt/workflow/accessor.rb', line 305

def aborted?
  @aborted || status == :aborted
end

#checksObject



98
99
100
# File 'lib/rbbt/workflow/step/run.rb', line 98

def checks
  rec_dependencies.collect{|dependency| dependency.path }.uniq
end

#child(&block) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/rbbt/workflow/step.rb', line 140

def child(&block)
  child_pid = Process.fork &block
  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  #Process.detach(child_pid)
  set_info :children_pids, children_pids
  child_pid
end

#cleanObject



183
184
185
186
# File 'lib/rbbt/workflow/step.rb', line 183

def clean
  Step.clean(path)
  self
end

#dirty?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/rbbt/workflow/step/run.rb', line 102

def dirty?
  rec_dependencies.collect{|dependency| dependency.path }.uniq.reject{|path| path.exists?}.any?
end

#done?Boolean

Returns:

  • (Boolean)


285
286
287
# File 'lib/rbbt/workflow/accessor.rb', line 285

def done?
  path and File.exists? path
end

#dup_inputsObject



75
76
77
78
79
80
81
# File 'lib/rbbt/workflow/step/run.rb', line 75

def dup_inputs
  return if @dupped or ENV["RBBT_NO_STREAM"] == 'true'
  @inputs = @inputs.collect do |input|
    Step.dup_stream input
  end
  @dupped = true
end

#error?Boolean

Returns:

  • (Boolean)


301
302
303
# File 'lib/rbbt/workflow/accessor.rb', line 301

def error?
  status == :error
end

#exception(ex, msg = nil) ⇒ Object



269
270
271
272
273
274
275
276
277
278
279
# File 'lib/rbbt/workflow/accessor.rb', line 269

def exception(ex, msg = nil)
  self._abort
  ex_class = ex.class.to_s
  set_info :backtrace, ex.backtrace
  set_info :exception, {:class => ex_class, :message => ex.message, :backtrace => ex.backtrace}
  if msg.nil?
    log :error, "#{ex_class} -- #{ex.message}"
  else
    log :error, "#{msg} -- #{ex.message}"
  end
end

#file(name) ⇒ Object



322
323
324
# File 'lib/rbbt/workflow/accessor.rb', line 322

def file(name)
  Path.setup(File.join(files_dir, name.to_s))
end

#filesObject



315
316
317
318
319
320
# File 'lib/rbbt/workflow/accessor.rb', line 315

def files
  files = Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path}.collect do |path| 
    Misc.path_relative_to(files_dir, path) 
  end
  files
end

#files_dirObject

{{{ INFO



311
312
313
# File 'lib/rbbt/workflow/accessor.rb', line 311

def files_dir
  @files_dir ||= Step.files_dir path
end

#fork(semaphore = nil) ⇒ Object



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/rbbt/workflow/step/run.rb', line 285

def fork(semaphore = nil)
  raise "Can not fork: Step is waiting for proces #{@pid} to finish" if not @pid.nil? and not Process.pid == @pid and Misc.pid_exists?(@pid) and not done? and info[:forked]
  @pid = Process.fork do
    Misc.pre_fork
    begin
      RbbtSemaphore.wait_semaphore(semaphore) if semaphore
      FileUtils.mkdir_p File.dirname(path) unless File.exists? File.dirname(path)
      begin
        res = run true
        set_info :forked, true
      rescue Aborted
        Log.debug{"Forked process aborted: #{path}"}
        log :aborted, "Job aborted (#{Process.pid})"
        raise $!
      rescue Exception
        Log.debug("Exception '#{$!.message}' caught on forked process: #{path}")
        raise $!
      ensure
        join_stream
      end

      begin
        children_pids = info[:children_pids]
        if children_pids
          children_pids.each do |pid|
            if Misc.pid_exists? pid
              begin
                Process.waitpid pid
              rescue Errno::ECHILD
                Log.low "Waiting on #{ pid } failed: #{$!.message}"
              end
            end
          end
          set_info :children_done, Time.now
        end
      rescue Exception
        Log.debug("Exception waiting for children: #{$!.message}")
        RbbtSemaphore.post_semaphore(semaphore) if semaphore
        Kernel.exit! -1
      end
      set_info :pid, nil
    ensure
      RbbtSemaphore.post_semaphore(semaphore) if semaphore
      Kernel.exit! 0
    end
  end
  Process.detach(@pid)
  self
end

#get_streamObject



62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/rbbt/workflow/step/run.rb', line 62

def get_stream
  @mutex.synchronize do
    begin
      return nil if @saved_stream
      if IO === @result 
        @saved_stream = @result 
      else 
        nil
      end
    end
  end
end

#graceObject



435
436
437
438
439
440
# File 'lib/rbbt/workflow/step/run.rb', line 435

def grace
  until done? or result or error? or aborted? or streaming? 
    sleep 1 
  end
  self
end

#info(check_lock = true) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/rbbt/workflow/accessor.rb', line 86

def info(check_lock = true)
  return {} if info_file.nil? or not Open.exists? info_file
  begin
    Misc.insist do
      begin
        return @info_cache if @info_cache and File.ctime(info_file) < @info_cache_time
      rescue Exception
        raise $!
      end

      begin
        @info_cache = Misc.insist(3, 1.6, info_file) do
          Misc.insist(2, 1, info_file) do
            Misc.insist(3, 0.2, info_file) do
              raise TryAgain, "Info locked" if check_lock and info_lock.locked?
              Open.open(info_file) do |file|
                INFO_SERIALIAZER.load(file) #|| {}
              end
            end
          end
        end
        @info_cache_time = Time.now
        @info_cache
      end
    end
  rescue Exception
    Log.debug{"Error loading info file: " + info_file}
    Log.exception $!
    Open.rm info_file
    Misc.sensiblewrite(info_file, INFO_SERIALIAZER.dump({:status => :error, :messages => ["Info file lost"]}))
    raise $!
  end
end

#info_fileObject

{{{ INFO



74
75
76
# File 'lib/rbbt/workflow/accessor.rb', line 74

def info_file
  @info_file ||= Step.info_file(path)
end

#info_lockObject



78
79
80
81
82
83
84
# File 'lib/rbbt/workflow/accessor.rb', line 78

def info_lock
  @info_lock = begin
                 path = Persist.persistence_path(info_file + '.lock', {:dir => Step.lock_dir})
                 Lockfile.new path, :refresh => false, :dont_use_lock_id => true
               end if @info_lock.nil?
  @info_lock
end

#joinObject



442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
# File 'lib/rbbt/workflow/step/run.rb', line 442

def join

  grace

  if streaming?
    join_stream 
  end

  return self if not Open.exists? info_file

  return self if info[:joined]
  pid = @pid 

  Misc.insist [0.1, 0.2, 0.5, 1] do
    pid ||= info[:pid]
  end

  begin
    if pid.nil? or Process.pid == pid
      dependencies.each{|dep| dep.join }
    else
      begin
        Log.debug{"Waiting for pid: #{pid}"}
        Process.waitpid pid 
      rescue Errno::ECHILD
        Log.debug{"Process #{ pid } already finished: #{ path }"}
      end if Misc.pid_exists? pid
      pid = nil
      dependencies.each{|dep| dep.join }
    end
    sleep 1 until path.exists?
    self
  ensure
    set_info :joined, true
  end
end

#join_streamObject



413
414
415
416
417
418
419
420
421
422
423
424
425
426
# File 'lib/rbbt/workflow/step/run.rb', line 413

def join_stream
  stream = get_stream if @result
  @result = nil
  if stream
    begin
      Misc.consume_stream stream 
      stream.join if stream.respond_to? :join
    rescue Exception
      stream.abort
      self._abort
      raise $!
    end
  end
end

#kill_childrenObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/rbbt/workflow/step/run.rb', line 106

def kill_children
  begin
    children_pids = info[:children_pids]
    if children_pids and children_pids.any?
      Log.medium("Killing children: #{ children_pids * ", " }")
      children_pids.each do |pid|
        Log.medium("Killing child #{ pid }")
        begin
          Process.kill "INT", pid
        rescue Exception
          Log.medium("Exception killing child #{ pid }: #{$!.message}")
        end
      end
    end
  rescue
    Log.medium("Exception finding children")
  end
end

#loadObject



154
155
156
157
158
159
# File 'lib/rbbt/workflow/step.rb', line 154

def load
  return prepare_result @result, @task.result_description if @result and not @path == @result
  join if not done?
  return Persist.load_file(@path, @task.result_type) if @path.exists?
  exec
end

#load_file(name, type = nil, options = {}) ⇒ Object



342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/rbbt/workflow/accessor.rb', line 342

def load_file(name, type = nil, options = {})
  if type.nil? and name =~ /.*\.(\w+)$/
    extension = name.match(/.*\.(\w+)$/)[1]
    case extension
    when "tc"
      type = :tc
    when "tsv"
      type = :tsv
    when "list", "ary", "array"
      type = :array
    when "yaml"
      type = :yaml
    when "marshal"
      type = :marshal
    else
      type = :other
    end
  else
    type ||= :other
  end

  case type.to_sym
  when :tc
    Persist.open_tokyocabinet(file(name), false)
  when :tsv
    TSV.open Open.open(file(name)), options
  when :array
    #Open.read(file(name)).split /\n|,\s*/
    Open.read(file(name)).split "\n"
  when :yaml
    YAML.load(Open.open(file(name)))
  when :marshal
    Marshal.load(Open.open(file(name)))
  else
    Open.read(file(name))
  end
end

#log(status, message = nil, &block) ⇒ Object



261
262
263
264
265
266
267
# File 'lib/rbbt/workflow/accessor.rb', line 261

def log(status, message = nil, &block)
  self.status = status
  if message
    self.message Log.uncolor(message)
  end
  Step.log(status, message, path, &block)
end

#log_progress(status, options = {}, &block) ⇒ Object



240
241
242
# File 'lib/rbbt/workflow/accessor.rb', line 240

def log_progress(status, options = {}, &block)
  Step.log_progress(status, options, file(:progress), &block)
end

#merge_info(hash) ⇒ Object



133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/rbbt/workflow/accessor.rb', line 133

def merge_info(hash)
  return nil if @exec or info_file.nil?
  value = Annotated.purge value if defined? Annotated
  Open.lock(info_file, :lock => info_lock) do
    i = info(false)
    i.merge! hash
    @info_cache = i
    Misc.sensiblewrite(info_file, INFO_SERIALIAZER.dump(i), :force => true, :lock => false)
    @info_cache_time = Time.now
    value
  end
end

#message(message) ⇒ Object



167
168
169
# File 'lib/rbbt/workflow/accessor.rb', line 167

def message(message)
  set_info(:messages, (messages || []) << message)
end

#messagesObject



159
160
161
162
163
164
165
# File 'lib/rbbt/workflow/accessor.rb', line 159

def messages
  if messages = info[:messages]
    messages
  else
    set_info(:messages, []) if self.respond_to?(:set_info)
  end
end

#nameObject



64
65
66
# File 'lib/rbbt/workflow/accessor.rb', line 64

def name
  path.sub(/.*\/#{Regexp.quote task.name.to_s}\/(.*)/, '\1')
end

#prepare_result(value, description = nil, info = {}) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/rbbt/workflow/step.rb', line 97

def prepare_result(value, description = nil, info = {})
  case 
  when IO === value
    begin
      res = case @task.result_type
            when :array
              array = []
              while line = value.gets
                array << line.strip
              end
              array
            when :tsv
              begin
                TSV.open(value)
              rescue IOError
                TSV.setup({})
              end
            else
              value.read
            end
      value.join if value.respond_to? :join
      res
    rescue Exception
      value.abort if value.respond_to? :abort
      self.abort
      raise $!
    end
  when (not defined? Entity or description.nil? or not Entity.formats.include? description)
    value
  when (Annotated === value and info.empty?)
    value
  when Annotated === value
    annotations = value.annotations
    info.each do |k,v|
      value.send("#{h}=", v) if annotations.include? k
    end
    value
  else
    Entity.formats[description].setup(value, info.merge(:format => description))
  end
end

#progress_bar(msg, options = {}) ⇒ Object



244
245
246
247
# File 'lib/rbbt/workflow/accessor.rb', line 244

def progress_bar(msg, options = {})
  max = options[:max]
  Log::ProgressBar.new max, {:desc => msg, :file => file(:progress)}.merge(options)
end

#provenanceObject



380
381
382
383
384
385
386
387
388
389
390
391
# File 'lib/rbbt/workflow/accessor.rb', line 380

def provenance
  provenance = {}
  dependencies.each do |dep|
    next unless dep.path.exists?
    if File.exists? dep.info_file
      provenance[dep.path] = dep.provenance if File.exists? dep.path
    else
      provenance[dep.path] = nil
    end
  end
  {:inputs => info[:inputs], :provenance => provenance}
end

#provenance_pathsObject



393
394
395
396
397
398
399
# File 'lib/rbbt/workflow/accessor.rb', line 393

def provenance_paths
  provenance = {}
  dependencies.each do |dep|
    provenance[dep.path] = dep.provenance_paths if File.exists? dep.path
  end
  provenance
end

#rec_dependenciesObject

dependencies = self.dependencies ? self.dependencies + new_dependencies : new_dependencies

dependencies.flatten!
dependencies.uniq!
dependencies

end



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/rbbt/workflow/step.rb', line 209

def rec_dependencies

  # A step result with no info_file means that it was manually
  # placed. In that case, do not consider its dependencies
  return [] if Open.exists?(self.path.to_s) and not Open.exists? self.info_file

  return [] if dependencies.nil? or dependencies.empty?

  new_dependencies = []
  dependencies.each{|step| 
    new_dependencies.concat step.rec_dependencies
    new_dependencies << step
  }
  new_dependencies.uniq
end

#recursive_cleanObject



225
226
227
228
229
230
231
232
233
234
# File 'lib/rbbt/workflow/step.rb', line 225

def recursive_clean
  clean
  rec_dependencies.each do |step| 
    if Open.exists?(step.info_file) 
      step.clean 
    else
    end
  end
  self
end

#relay_log(step) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/rbbt/workflow/step.rb', line 80

def relay_log(step)
  return self unless Task === self.task and not self.task.name.nil?
  if not self.respond_to? :original_log
    class << self
      attr_accessor :relay_step
      alias original_log log 
      def log(status, message = nil)
        self.status = status
        message Log.uncolor message
        relay_step.log([task.name.to_s, status.to_s] * ">", message.nil? ? nil : message ) unless (relay_step.done? or relay_step.error? or relay_step.aborted?)
      end
    end
  end
  @relay_step = step
  self
end

#run(no_load = false) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/rbbt/workflow/step/run.rb', line 171

def run(no_load = false)
  result = nil

  begin
    @mutex.synchronize do
      no_load = no_load ? :stream : false
      result = Persist.persist "Job", @task.result_type, :file => path, :check => checks, :no_load => no_load do |lockfile|
        if Step === Step.log_relay_step and not self == Step.log_relay_step
          relay_log(Step.log_relay_step) unless self.respond_to? :relay_step and self.relay_step
        end
        @exec = false

        Open.rm info_file if Open.exists? info_file

        log :setup, "#{Log.color :cyan, "Setup"} #{Log.color :yellow, task.name.to_s || ""}"

        merge_info({
          :pid => Process.pid,
          :issued => Time.now,
          :name => name,
          :clean_name => clean_name,
          :dependencies => dependencies.collect{|dep| [dep.task_name, dep.name, dep.path]},
        })

        dup_inputs
        begin
          run_dependencies
        rescue Exception
          stop_dependencies
          raise $!
        end


        set_info :inputs, Misc.remove_long_items(Misc.zip2hash(task.inputs, @inputs)) unless task.inputs.nil?

        set_info :started, (start_time = Time.now)
        log :started, "#{Log.color :magenta, "Starting"} #{Log.color :yellow, task.name.to_s || ""}"

        begin
          result = _exec
        rescue Aborted
          stop_dependencies
          log(:aborted, "Aborted")
          raise $!
        rescue Exception
          backtrace = $!.backtrace

          # HACK: This fixes an strange behaviour in 1.9.3 where some
          # backtrace strings are coded in ASCII-8BIT
          set_info :backtrace, backtrace 
          log(:error, "#{$!.class}: #{$!.message}")
          backtrace.each{|l| l.force_encoding("UTF-8")} if String.instance_methods.include? :force_encoding
          stop_dependencies
          raise $!
        end

        if not no_load or ENV["RBBT_NO_STREAM"] == "true" 
          result = prepare_result result, @task.description, info if IO === result 
          result = prepare_result result.stream, @task.description, info if TSV::Dumper === result 
        end

        stream = case result
                 when IO
                   result
                 when TSV::Dumper
                   result.stream
                 end

        if stream
          log :streaming, "#{Log.color :magenta, "Streaming"} #{Log.color :yellow, task.name.to_s || ""}"
          ConcurrentStream.setup stream do
            begin
              if status != :done
                Misc.insist do
                  set_info :done, (done_time = Time.now)
                  set_info :time_elapsed, (time_elapsed = done_time - start_time)
                  log :done, "#{Log.color :magenta, "Completed"} #{Log.color :yellow, task.name.to_s || ""} in #{time_elapsed.to_i} sec."
                end
              end
            rescue
              Log.exception $!
            ensure
              join
            end
          end
          stream.abort_callback = Proc.new do
            begin
              log :aborted, "#{Log.color :red, "Aborted"} #{Log.color :yellow, task.name.to_s || ""}" if status == :streaming
            rescue
              Log.exception $!
            end
          end
        else
          set_info :done, (done_time = Time.now)
          set_info :time_elapsed, (time_elapsed = done_time - start_time)
          log :done, "#{Log.color :magenta, "Completed"} #{Log.color :yellow, task.name.to_s || ""} in #{time_elapsed.to_i} sec."
        end

        result
      end

      if no_load
        @result ||= result
        self
      else
        @result = prepare_result result, @task.result_description
      end
    end
  rescue Exception
    exception $!
    raise $!
  end
end

#run_dependenciesObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rbbt/workflow/step/run.rb', line 125

def run_dependencies
  @seen ||= []
  seen_paths ||= Set.new
  
  dependencies.uniq.each do |dependency| 
    dependency_path = dependency.path
    next if seen_paths.include? dependency_path
    @seen.concat dependency.rec_dependencies
    seen_paths.union(dependency.rec_dependencies.collect{|d| d.path})
    @seen << dependency
    seen_paths << dependency_path
  end

  @seen.uniq!
  @seen.delete self

  return if @seen.empty?

  log :dependencies, "#{Log.color :magenta, "Dependencies"} #{Log.color :yellow, task.name.to_s || ""}"
  dupping = []
  @seen.each do |dependency|
    next if (dependency.done? and not dependency.dirty?) or (dependency.streaming? and dependency.running?)
    dependency.clean
    dupping << dependency unless dependencies.include? dependency
  end

  dupping.each{|dep| dep.dup_inputs}

  @seen.each do |dependency| 
    next unless dependencies.include? dependency
    Log.info "#{Log.color :cyan, "dependency"} #{Log.color :yellow, task.name.to_s || ""} => #{Log.color :yellow, dependency.task_name.to_s || ""} -- #{Log.color :blue, dependency.path}"
    begin
      dependency.run(true) unless dependency.done? or dependency.started? 
    rescue Aborted
      Log.error "Aborted dep. #{Log.color :red, dependency.task.name.to_s}"
      raise $!
    rescue Interrupt
      Log.error "Interrupted while in dep. #{Log.color :red, dependency.task.name.to_s}"
      raise $!
    rescue Exception
      Log.error "Exception in dep. #{ Log.color :red, dependency.task.name.to_s }"
      raise $!
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


293
294
295
296
297
298
299
# File 'lib/rbbt/workflow/accessor.rb', line 293

def running?
  return nil if not Open.exists? info_file
  return nil if info[:pid].nil?

  pid = @pid || info[:pid]
  return Misc.pid_exists?(pid) 
end

#save_file(name, content) ⇒ Object



326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
# File 'lib/rbbt/workflow/accessor.rb', line 326

def save_file(name, content)
  content = case
            when String === content
              content
            when Array === content
              content * "\n"
            when TSV === content
              content.to_s
            when Hash === content
              content.collect{|*p| p * "\t"} * "\n"
            else
              content.to_s
            end
  Open.write(file(name), content)
end

#set_info(key, value) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/rbbt/workflow/accessor.rb', line 120

def set_info(key, value)
  return nil if @exec or info_file.nil?
  value = Annotated.purge value if defined? Annotated
  Open.lock(info_file, :lock => info_lock) do
    i = info(false)
    i[key] = value 
    @info_cache = i
    Misc.sensiblewrite(info_file, INFO_SERIALIAZER.dump(i), :force => true, :lock => false)
    @info_cache_time = Time.now
    value
  end
end

#soft_graceObject



428
429
430
431
432
433
# File 'lib/rbbt/workflow/step/run.rb', line 428

def soft_grace
  until Open.exists? info_file
    sleep 1 
  end
  self
end

#started?Boolean

Returns:

  • (Boolean)


281
282
283
# File 'lib/rbbt/workflow/accessor.rb', line 281

def started?
  Open.exists? info_file or Open.exists? path
end

#statusObject



146
147
148
149
150
151
152
153
# File 'lib/rbbt/workflow/accessor.rb', line 146

def status
  begin
    info[:status]
  rescue Exception
    Log.error "Exception reading status: #{$!.message}" 
    :error
  end
end

#status=(status) ⇒ Object



155
156
157
# File 'lib/rbbt/workflow/accessor.rb', line 155

def status=(status)
  set_info(:status, status)
end

#step(name) ⇒ Object



236
237
238
239
240
241
242
243
244
245
# File 'lib/rbbt/workflow/step.rb', line 236

def step(name)
  @steps ||= {}
  @steps[name] ||= begin
                     deps = rec_dependencies.select{|step| 
                       step.task_name.to_sym == name.to_sym
                     }
                     deps.first
                   end

end

#stop_dependenciesObject



335
336
337
338
339
340
# File 'lib/rbbt/workflow/step/run.rb', line 335

def stop_dependencies
  dependencies.each do |dep|
    dep.abort
  end
  kill_children
end

#streaming?Boolean

Returns:

  • (Boolean)


289
290
291
# File 'lib/rbbt/workflow/accessor.rb', line 289

def streaming?
  IO === @result or @saved_stream or status == :streaming
end

#task_nameObject



64
65
66
# File 'lib/rbbt/workflow/step.rb', line 64

def task_name
  @task.name
end