Class: Step

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/workflow/step.rb,
lib/rbbt/workflow/accessor.rb

Defined Under Namespace

Classes: Aborted

Constant Summary collapse

INFO_SERIALIAZER =
Marshal

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil) ⇒ Step

Returns a new instance of Step.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/rbbt/workflow/step.rb', line 16

def initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil)
  path = Misc.sanitize_filename path
  @path = Path.setup(path)
  @task = task
  @bindings = bindings
  @dependencies = case
                  when dependencies.nil? 
                    []
                  when Array === dependencies
                    dependencies
                  else
                    [dependencies]
                  end
  @inputs = inputs || []
end

Class Attribute Details

.log_relay_stepObject

Returns the value of attribute log_relay_step.



33
34
35
# File 'lib/rbbt/workflow/step.rb', line 33

def log_relay_step
  @log_relay_step
end

Instance Attribute Details

#bindingsObject

Returns the value of attribute bindings.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def bindings
  @bindings
end

#dependenciesObject

Returns the value of attribute dependencies.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def dependencies
  @dependencies
end

#execObject

Returns the value of attribute exec.



12
13
14
# File 'lib/rbbt/workflow/step.rb', line 12

def exec
  @exec
end

#inputsObject

Returns the value of attribute inputs.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def inputs
  @inputs
end

#pathObject

Returns the value of attribute path.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def path
  @path
end

#pidObject

Returns the value of attribute pid.



11
12
13
# File 'lib/rbbt/workflow/step.rb', line 11

def pid
  @pid
end

#taskObject

Returns the value of attribute task.



10
11
12
# File 'lib/rbbt/workflow/step.rb', line 10

def task
  @task
end

Instance Method Details

#abortObject



216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/rbbt/workflow/step.rb', line 216

def abort
  @pid ||= info[:pid]
  if @pid.nil?
    Log.medium "Could not abort #{path}: no pid"
    false
  else
    Log.medium "Aborting #{path}: #{ @pid }"
    begin
      Process.kill("INT", @pid)
      Process.waitpid @pid
    rescue Exception
      Log.debug("Aborted job #{@pid} was not killed: #{$!.message}")
    end
    log(:aborted, "Job aborted by user")
    true
  end
end

#aborted?Boolean

Returns:

  • (Boolean)


97
98
99
# File 'lib/rbbt/workflow/accessor.rb', line 97

def aborted?
  info[:status] == :aborted
end

#child(&block) ⇒ Object



234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/rbbt/workflow/step.rb', line 234

def child(&block)
  child_pid = Process.fork &block
  children_pids = info[:children_pids]
  if children_pids.nil?
    children_pids = [child_pid]
  else
    children_pids << child_pid
  end
  #Process.detach(child_pid)
  set_info :children_pids, children_pids
  child_pid
end

#cleanObject



255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/rbbt/workflow/step.rb', line 255

def clean
  if Open.exists?(path) or Open.exists?(info_file)
    begin
      Open.rm info_file if Open.exists? info_file
      Open.rm info_file + '.lock' if Open.exists? info_file + '.lock'
      Open.rm path if Open.exists? path
      Open.rm path + '.lock' if Open.exists? path + '.lock'
      Open.rm_rf files_dir if Open.exists? files_dir
    end
  end
  self
end

#clean_nameObject



12
13
14
# File 'lib/rbbt/workflow/accessor.rb', line 12

def clean_name
  name.sub(/(.*)_.*/, '\1')
end

#done?Boolean

Returns:

  • (Boolean)


83
84
85
# File 'lib/rbbt/workflow/accessor.rb', line 83

def done?
  path and path.exists?
end

#error?Boolean

Returns:

  • (Boolean)


93
94
95
# File 'lib/rbbt/workflow/accessor.rb', line 93

def error?
  info[:status] == :error
end

#file(name) ⇒ Object



114
115
116
# File 'lib/rbbt/workflow/accessor.rb', line 114

def file(name)
  Path.setup(File.join(files_dir, name.to_s))
end

#filesObject



107
108
109
110
111
112
# File 'lib/rbbt/workflow/accessor.rb', line 107

def files
  files = Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path}.collect do |path| 
    Misc.path_relative_to(files_dir, path) 
  end
  files
end

#files_dirObject

{{{ INFO



103
104
105
# File 'lib/rbbt/workflow/accessor.rb', line 103

def files_dir
  @path + '.files'
end

#fork(semaphore = nil) ⇒ Object



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/rbbt/workflow/step.rb', line 170

def fork(semaphore = nil)
  raise "Can not fork: Step is waiting for proces #{@pid} to finish" if not @pid.nil?
  @pid = Process.fork do
    trap(:INT) { raise Step::Aborted.new "INT signal recieved" }
    begin
      RbbtSemaphore.wait_semaphore(semaphore) if semaphore
      FileUtils.mkdir_p File.dirname(path) unless Open.exists? File.dirname(path)
      begin
        run(true)
      rescue Step::Aborted
        Log.debug{"Forked process aborted: #{@path}"}
        log :aborted, "Aborted"
        raise $!
      rescue Exception
        Log.debug("Exception caught on forked process: #{@path}")
        raise $!
      end

      begin
        children_pids = info[:children_pids]
        if children_pids
          children_pids.each do |pid|
            if Misc.pid_exists? pid
              begin
                Process.waitpid pid
              rescue Errno::ECHILD
                Log.error "Waiting on #{ pid } failed: #{$!.message}"
              end
            end
          end
          set_info :children_done, Time.now
        end
      rescue Exception
        Log.debug("Exception waiting for children: #{$!.message}")
        exit -1
      end
      set_info :pid, nil
      exit 0
    ensure
      RbbtSemaphore.post_semaphore(semaphore) if semaphore
    end
  end
  Process.detach(@pid)
  self
end

#infoObject



26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/rbbt/workflow/accessor.rb', line 26

def info
  return {} if not Open.exists? info_file
  begin
    Misc.insist(2, 0.5) do
      Open.open(info_file) do |file|
        INFO_SERIALIAZER.load(file) || {}
      end
    end
  rescue Exception
    Log.debug{"Error loading info file: " + info_file}
    raise $!
  end
end

#info_fileObject

{{{ INFO



22
23
24
# File 'lib/rbbt/workflow/accessor.rb', line 22

def info_file
  @path + '.info'
end

#joinObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/rbbt/workflow/step.rb', line 75

def join
  if @pid.nil?
    self
  else
    begin
      Log.debug{"Waiting for pid: #{@pid}"}
      Process.waitpid @pid 
    rescue Errno::ECHILD
      Log.debug{"Process #{ @pid } already finished: #{ path }"}
    end if Misc.pid_exists? @pid
    @pid = nil
  end
  self
end

#loadObject



247
248
249
250
251
252
253
# File 'lib/rbbt/workflow/step.rb', line 247

def load
  raise "Can not load: Step is waiting for proces #{@pid} to finish" if not done?
  result = Persist.persist "Job", @task.result_type, :file => @path, :check => rec_dependencies.collect{|dependency| dependency.path} do
    exec
  end
  prepare_result result, @task.result_description, info
end

#load_file(name, type = nil, options = {}) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/rbbt/workflow/accessor.rb', line 134

def load_file(name, type = nil, options = {})
  if type.nil? and name =~ /.*\.(\w+)$/
    extension = name.match(/.*\.(\w+)$/)[1]
    case extension
    when "tc"
      type = :tc
    when "tsv"
      type = :tsv
    when "list", "ary", "array"
      type = :array
    when "yaml"
      type = :yaml
    when "marshal"
      type = :marshal
    else
      type = :other
    end
  else
    type ||= :other
  end

  case type.to_sym
  when :tc
    Persist.open_tokyocabinet(file(name), false)
  when :tsv
    TSV.open Open.open(file(name)), options
  when :array
    Open.read(file(name)).split /\n|,\s*/
  when :yaml
    YAML.load(Open.open(file(name)))
  when :marshal
    Marshal.load(Open.open(file(name)))
  else
    Open.read(file(name))
  end
end

#log(status, message = nil, do_log = true) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rbbt/workflow/accessor.rb', line 67

def log(status, message = nil, do_log = true)

  if message
    Log.low "[#{ status }] #{ message }: #{path}"
  else
    Log.low "[#{ status }]: #{path}"
  end if do_log

  self.status = status
  message(message) unless message.nil?
end

#message(message) ⇒ Object



63
64
65
# File 'lib/rbbt/workflow/accessor.rb', line 63

def message(message)
  set_info(:messages, (messages || []) << message)
end

#messagesObject



59
60
61
# File 'lib/rbbt/workflow/accessor.rb', line 59

def messages
  info[:messages] || set_info(:messages, [])
end

#nameObject



8
9
10
# File 'lib/rbbt/workflow/accessor.rb', line 8

def name
  @path.sub(/.*\/#{Regexp.quote task.name.to_s}\/(.*)/, '\1')
end

#prepare_result(value, description = nil, info = {}) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/rbbt/workflow/step.rb', line 52

def prepare_result(value, description = nil, info = {})
  case
  when (not defined? Entity or description.nil? or not Entity.formats.include? description)
    value
  when (Annotated === value and info.empty?)
    value
  when Annotated === value
    annotations = value.annotations
    info.each do |k,v|
      value.send("#{h}=", v) if annotations.include? k
    end
    value
  else
    Entity.formats[description].setup(value, info.merge(:format => description))
  end
end

#rec_dependenciesObject



268
269
270
# File 'lib/rbbt/workflow/step.rb', line 268

def rec_dependencies
  @dependencies.collect{|step| step.rec_dependencies}.flatten.concat  @dependencies
end

#recursive_cleanObject



272
273
274
275
# File 'lib/rbbt/workflow/step.rb', line 272

def recursive_clean
  rec_dependencies.each{|step| step.clean }
  clean
end

#relay_log(step) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/rbbt/workflow/step.rb', line 36

def relay_log(step)
  return self unless Task === self.task and not self.task.name.nil?
  if not self.respond_to? :original_log
    class << self
      attr_accessor :relay_step
      alias original_log log 
      def log(status, message = nil, do_log = true)
        original_log(status, message, do_log)
        relay_step.log([task.name.to_s, status.to_s] * ">", message.nil? ? nil : [task.name.to_s, message] * ">", false)
      end
    end
  end
  @relay_step = step
  self
end

#run(no_load = false) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/rbbt/workflow/step.rb', line 90

def run(no_load = false)
  result = Persist.persist "Job", @task.result_type, :file => @path, :check => rec_dependencies.collect{|dependency| dependency.path }.uniq, :no_load => no_load do
    @exec = false
    if Step === Step.log_relay_step and not self == Step.log_relay_step
      relay_log(Step.log_relay_step) unless self.respond_to? :relay_step and self.relay_step
    end

    Open.rm info_file if Open.exists? info_file

    set_info :pid, Process.pid

    set_info :dependencies, dependencies.collect{|dep| [dep.task.name, dep.name]}
    dependencies.each{|dependency| 
      begin
        dependency.relay_log self
        dependency.clean if not dependency.done? and dependency.error?
        dependency.run true
      rescue Exception
        backtrace = $!.backtrace
        set_info :backtrace, backtrace 
        log(:error, "Exception processing dependency #{dependency.path}")
        log(:error, "#{$!.class}: #{$!.message}")
        log(:error, "backtrace: #{$!.backtrace.first}")
        raise "Exception processing dependency #{dependency.path}"
      end
    }
    
    Log.medium("Starting task #{task.name || ""} [#{Process.pid}]: #{ path }")
    set_info :status, :started

    set_info :started, (start_time = Time.now)
    
    set_info :inputs, Misc.remove_long_items(Misc.zip2hash(task.inputs, @inputs)) unless task.inputs.nil?

    res = begin
            exec
          rescue Step::Aborted
            log(:error, "Aborted")

            children_pids = info[:children_pids]
            if children_pids and children_pids.any?
              Log.medium("Killing children: #{ children_pids * ", " }")
              children_pids.each do |pid|
                Log.medium("Killing child #{ pid }")
                begin
                  Process.kill "INT", pid
                rescue Exception
                  Log.medium("Exception killing child #{ pid }: #{$!.message}")
                end
              end
            end

            raise $!
          rescue Exception
            backtrace = $!.backtrace

            # HACK: This fixes an strange behaviour in 1.9.3 where some
            # bactrace strings are coded in ASCII-8BIT
            backtrace.each{|l| l.force_encoding("UTF-8")} if String.instance_methods.include? :force_encoding

            set_info :backtrace, backtrace 
            log(:error, "#{$!.class}: #{$!.message}")
            log(:error, "backtrace: #{$!.backtrace.first}")
            raise $!
          end

    set_info :status, :done
    set_info :done, (done_time = Time.now)
    set_info :time_elapsed, done_time - start_time
    Log.medium("Completed task #{task.name || ""} [#{Process.pid}]: #{ path }")
    res
  end

  if no_load
    self
  else
    prepare_result result, @task.result_description, info
  end
end

#running?Boolean

Returns:

  • (Boolean)


87
88
89
90
91
# File 'lib/rbbt/workflow/accessor.rb', line 87

def running?
  return nil if not Open.exists? info_file
  return nil if info[:pid].nil?
  return Misc.pid_exists? info[:pid]
end

#save_file(name, content) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/rbbt/workflow/accessor.rb', line 118

def save_file(name, content)
  content = case
            when String === content
              content
            when Array === content
              content * "\n"
            when TSV === content
              content.to_s
            when Hash === content
              content.collect{|*p| p * "\t"} * "\n"
            else
              content.to_s
            end
  Open.write(file(name), content)
end

#set_info(key, value) ⇒ Object



40
41
42
43
44
45
46
47
48
49
# File 'lib/rbbt/workflow/accessor.rb', line 40

def set_info(key, value)
  return nil if @exec
  value = Annotated.purge value if defined? Annotated
  Open.lock(info_file) do
    i = info
    i[key] = value
    Open.write(info_file, INFO_SERIALIAZER.dump(i))
    value
  end
end

#started?Boolean

Returns:

  • (Boolean)


79
80
81
# File 'lib/rbbt/workflow/accessor.rb', line 79

def started?
  Open.exists? info_file
end

#statusObject



51
52
53
# File 'lib/rbbt/workflow/accessor.rb', line 51

def status
  info[:status]
end

#status=(status) ⇒ Object



55
56
57
# File 'lib/rbbt/workflow/accessor.rb', line 55

def status=(status)
  set_info(:status, status)
end

#step(name) ⇒ Object



277
278
279
# File 'lib/rbbt/workflow/step.rb', line 277

def step(name)
  rec_dependencies.select{|step| step.task.name.to_sym == name.to_sym}.first
end

#task_nameObject



16
17
18
# File 'lib/rbbt/workflow/accessor.rb', line 16

def task_name
  @task_name ||= task.name
end