Class: Step
- Inherits:
-
Object
- Object
- Step
- Defined in:
- lib/rbbt/workflow/step.rb,
lib/rbbt/workflow/accessor.rb
Defined Under Namespace
Classes: Aborted
Constant Summary collapse
- INFO_SERIALIAZER =
Marshal
Class Attribute Summary collapse
-
.log_relay_step ⇒ Object
Returns the value of attribute log_relay_step.
Instance Attribute Summary collapse
-
#bindings ⇒ Object
Returns the value of attribute bindings.
-
#dependencies ⇒ Object
Returns the value of attribute dependencies.
-
#exec ⇒ Object
Returns the value of attribute exec.
-
#inputs ⇒ Object
Returns the value of attribute inputs.
-
#path ⇒ Object
Returns the value of attribute path.
-
#pid ⇒ Object
Returns the value of attribute pid.
-
#task ⇒ Object
Returns the value of attribute task.
Instance Method Summary collapse
- #abort ⇒ Object
- #aborted? ⇒ Boolean
- #child(&block) ⇒ Object
- #clean ⇒ Object
- #clean_name ⇒ Object
- #done? ⇒ Boolean
- #error? ⇒ Boolean
- #file(name) ⇒ Object
- #files ⇒ Object
-
#files_dir ⇒ Object
{{{ INFO.
- #fork(semaphore = nil) ⇒ Object
- #info ⇒ Object
-
#info_file ⇒ Object
{{{ INFO.
-
#initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil) ⇒ Step
constructor
A new instance of Step.
- #join ⇒ Object
- #load ⇒ Object
- #load_file(name, type = nil, options = {}) ⇒ Object
- #log(status, message = nil, do_log = true) ⇒ Object
- #message(message) ⇒ Object
- #messages ⇒ Object
- #name ⇒ Object
- #prepare_result(value, description = nil, info = {}) ⇒ Object
- #rec_dependencies ⇒ Object
- #recursive_clean ⇒ Object
- #relay_log(step) ⇒ Object
- #run(no_load = false) ⇒ Object
- #running? ⇒ Boolean
- #save_file(name, content) ⇒ Object
- #set_info(key, value) ⇒ Object
- #started? ⇒ Boolean
- #status ⇒ Object
- #status=(status) ⇒ Object
- #step(name) ⇒ Object
- #task_name ⇒ Object
Constructor Details
#initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil) ⇒ Step
Returns a new instance of Step.
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/rbbt/workflow/step.rb', line 16 def initialize(path, task = nil, inputs = nil, dependencies = nil, bindings = nil) path = Misc.sanitize_filename path @path = Path.setup(path) @task = task @bindings = bindings @dependencies = case when dependencies.nil? [] when Array === dependencies dependencies else [dependencies] end @inputs = inputs || [] end |
Class Attribute Details
.log_relay_step ⇒ Object
Returns the value of attribute log_relay_step.
33 34 35 |
# File 'lib/rbbt/workflow/step.rb', line 33 def log_relay_step @log_relay_step end |
Instance Attribute Details
#bindings ⇒ Object
Returns the value of attribute bindings.
10 11 12 |
# File 'lib/rbbt/workflow/step.rb', line 10 def bindings @bindings end |
#dependencies ⇒ Object
Returns the value of attribute dependencies.
10 11 12 |
# File 'lib/rbbt/workflow/step.rb', line 10 def dependencies @dependencies end |
#exec ⇒ Object
Returns the value of attribute exec.
12 13 14 |
# File 'lib/rbbt/workflow/step.rb', line 12 def exec @exec end |
#inputs ⇒ Object
Returns the value of attribute inputs.
10 11 12 |
# File 'lib/rbbt/workflow/step.rb', line 10 def inputs @inputs end |
#path ⇒ Object
Returns the value of attribute path.
10 11 12 |
# File 'lib/rbbt/workflow/step.rb', line 10 def path @path end |
#pid ⇒ Object
Returns the value of attribute pid.
11 12 13 |
# File 'lib/rbbt/workflow/step.rb', line 11 def pid @pid end |
#task ⇒ Object
Returns the value of attribute task.
10 11 12 |
# File 'lib/rbbt/workflow/step.rb', line 10 def task @task end |
Instance Method Details
#abort ⇒ Object
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/rbbt/workflow/step.rb', line 216 def abort @pid ||= info[:pid] if @pid.nil? Log.medium "Could not abort #{path}: no pid" false else Log.medium "Aborting #{path}: #{ @pid }" begin Process.kill("INT", @pid) Process.waitpid @pid rescue Exception Log.debug("Aborted job #{@pid} was not killed: #{$!.}") end log(:aborted, "Job aborted by user") true end end |
#aborted? ⇒ Boolean
97 98 99 |
# File 'lib/rbbt/workflow/accessor.rb', line 97 def aborted? info[:status] == :aborted end |
#child(&block) ⇒ Object
234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/rbbt/workflow/step.rb', line 234 def child(&block) child_pid = Process.fork &block children_pids = info[:children_pids] if children_pids.nil? children_pids = [child_pid] else children_pids << child_pid end #Process.detach(child_pid) set_info :children_pids, children_pids child_pid end |
#clean ⇒ Object
255 256 257 258 259 260 261 262 263 264 265 266 |
# File 'lib/rbbt/workflow/step.rb', line 255 def clean if Open.exists?(path) or Open.exists?(info_file) begin Open.rm info_file if Open.exists? info_file Open.rm info_file + '.lock' if Open.exists? info_file + '.lock' Open.rm path if Open.exists? path Open.rm path + '.lock' if Open.exists? path + '.lock' Open.rm_rf files_dir if Open.exists? files_dir end end self end |
#clean_name ⇒ Object
12 13 14 |
# File 'lib/rbbt/workflow/accessor.rb', line 12 def clean_name name.sub(/(.*)_.*/, '\1') end |
#done? ⇒ Boolean
83 84 85 |
# File 'lib/rbbt/workflow/accessor.rb', line 83 def done? path and path.exists? end |
#error? ⇒ Boolean
93 94 95 |
# File 'lib/rbbt/workflow/accessor.rb', line 93 def error? info[:status] == :error end |
#file(name) ⇒ Object
114 115 116 |
# File 'lib/rbbt/workflow/accessor.rb', line 114 def file(name) Path.setup(File.join(files_dir, name.to_s)) end |
#files ⇒ Object
107 108 109 110 111 112 |
# File 'lib/rbbt/workflow/accessor.rb', line 107 def files files = Dir.glob(File.join(files_dir, '**', '*')).reject{|path| File.directory? path}.collect do |path| Misc.path_relative_to(files_dir, path) end files end |
#files_dir ⇒ Object
{{{ INFO
103 104 105 |
# File 'lib/rbbt/workflow/accessor.rb', line 103 def files_dir @path + '.files' end |
#fork(semaphore = nil) ⇒ Object
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/rbbt/workflow/step.rb', line 170 def fork(semaphore = nil) raise "Can not fork: Step is waiting for proces #{@pid} to finish" if not @pid.nil? @pid = Process.fork do trap(:INT) { raise Step::Aborted.new "INT signal recieved" } begin RbbtSemaphore.wait_semaphore(semaphore) if semaphore FileUtils.mkdir_p File.dirname(path) unless Open.exists? File.dirname(path) begin run(true) rescue Step::Aborted Log.debug{"Forked process aborted: #{@path}"} log :aborted, "Aborted" raise $! rescue Exception Log.debug("Exception caught on forked process: #{@path}") raise $! end begin children_pids = info[:children_pids] if children_pids children_pids.each do |pid| if Misc.pid_exists? pid begin Process.waitpid pid rescue Errno::ECHILD Log.error "Waiting on #{ pid } failed: #{$!.}" end end end set_info :children_done, Time.now end rescue Exception Log.debug("Exception waiting for children: #{$!.}") exit -1 end set_info :pid, nil exit 0 ensure RbbtSemaphore.post_semaphore(semaphore) if semaphore end end Process.detach(@pid) self end |
#info ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/rbbt/workflow/accessor.rb', line 26 def info return {} if not Open.exists? info_file begin Misc.insist(2, 0.5) do Open.open(info_file) do |file| INFO_SERIALIAZER.load(file) || {} end end rescue Exception Log.debug{"Error loading info file: " + info_file} raise $! end end |
#info_file ⇒ Object
{{{ INFO
22 23 24 |
# File 'lib/rbbt/workflow/accessor.rb', line 22 def info_file @path + '.info' end |
#join ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/rbbt/workflow/step.rb', line 75 def join if @pid.nil? self else begin Log.debug{"Waiting for pid: #{@pid}"} Process.waitpid @pid rescue Errno::ECHILD Log.debug{"Process #{ @pid } already finished: #{ path }"} end if Misc.pid_exists? @pid @pid = nil end self end |
#load ⇒ Object
247 248 249 250 251 252 253 |
# File 'lib/rbbt/workflow/step.rb', line 247 def load raise "Can not load: Step is waiting for proces #{@pid} to finish" if not done? result = Persist.persist "Job", @task.result_type, :file => @path, :check => rec_dependencies.collect{|dependency| dependency.path} do exec end prepare_result result, @task.result_description, info end |
#load_file(name, type = nil, options = {}) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/rbbt/workflow/accessor.rb', line 134 def load_file(name, type = nil, = {}) if type.nil? and name =~ /.*\.(\w+)$/ extension = name.match(/.*\.(\w+)$/)[1] case extension when "tc" type = :tc when "tsv" type = :tsv when "list", "ary", "array" type = :array when "yaml" type = :yaml when "marshal" type = :marshal else type = :other end else type ||= :other end case type.to_sym when :tc Persist.open_tokyocabinet(file(name), false) when :tsv TSV.open Open.open(file(name)), when :array Open.read(file(name)).split /\n|,\s*/ when :yaml YAML.load(Open.open(file(name))) when :marshal Marshal.load(Open.open(file(name))) else Open.read(file(name)) end end |
#log(status, message = nil, do_log = true) ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/rbbt/workflow/accessor.rb', line 67 def log(status, = nil, do_log = true) if Log.low "[#{ status }] #{ }: #{path}" else Log.low "[#{ status }]: #{path}" end if do_log self.status = status () unless .nil? end |
#message(message) ⇒ Object
63 64 65 |
# File 'lib/rbbt/workflow/accessor.rb', line 63 def () set_info(:messages, ( || []) << ) end |
#messages ⇒ Object
59 60 61 |
# File 'lib/rbbt/workflow/accessor.rb', line 59 def info[:messages] || set_info(:messages, []) end |
#name ⇒ Object
8 9 10 |
# File 'lib/rbbt/workflow/accessor.rb', line 8 def name @path.sub(/.*\/#{Regexp.quote task.name.to_s}\/(.*)/, '\1') end |
#prepare_result(value, description = nil, info = {}) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/rbbt/workflow/step.rb', line 52 def prepare_result(value, description = nil, info = {}) case when (not defined? Entity or description.nil? or not Entity.formats.include? description) value when (Annotated === value and info.empty?) value when Annotated === value annotations = value.annotations info.each do |k,v| value.send("#{h}=", v) if annotations.include? k end value else Entity.formats[description].setup(value, info.merge(:format => description)) end end |
#rec_dependencies ⇒ Object
268 269 270 |
# File 'lib/rbbt/workflow/step.rb', line 268 def rec_dependencies @dependencies.collect{|step| step.rec_dependencies}.flatten.concat @dependencies end |
#recursive_clean ⇒ Object
272 273 274 275 |
# File 'lib/rbbt/workflow/step.rb', line 272 def recursive_clean rec_dependencies.each{|step| step.clean } clean end |
#relay_log(step) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/rbbt/workflow/step.rb', line 36 def relay_log(step) return self unless Task === self.task and not self.task.name.nil? if not self.respond_to? :original_log class << self attr_accessor :relay_step alias original_log log def log(status, = nil, do_log = true) original_log(status, , do_log) relay_step.log([task.name.to_s, status.to_s] * ">", .nil? ? nil : [task.name.to_s, ] * ">", false) end end end @relay_step = step self end |
#run(no_load = false) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/rbbt/workflow/step.rb', line 90 def run(no_load = false) result = Persist.persist "Job", @task.result_type, :file => @path, :check => rec_dependencies.collect{|dependency| dependency.path }.uniq, :no_load => no_load do @exec = false if Step === Step.log_relay_step and not self == Step.log_relay_step relay_log(Step.log_relay_step) unless self.respond_to? :relay_step and self.relay_step end Open.rm info_file if Open.exists? info_file set_info :pid, Process.pid set_info :dependencies, dependencies.collect{|dep| [dep.task.name, dep.name]} dependencies.each{|dependency| begin dependency.relay_log self dependency.clean if not dependency.done? and dependency.error? dependency.run true rescue Exception backtrace = $!.backtrace set_info :backtrace, backtrace log(:error, "Exception processing dependency #{dependency.path}") log(:error, "#{$!.class}: #{$!.}") log(:error, "backtrace: #{$!.backtrace.first}") raise "Exception processing dependency #{dependency.path}" end } Log.medium("Starting task #{task.name || ""} [#{Process.pid}]: #{ path }") set_info :status, :started set_info :started, (start_time = Time.now) set_info :inputs, Misc.remove_long_items(Misc.zip2hash(task.inputs, @inputs)) unless task.inputs.nil? res = begin exec rescue Step::Aborted log(:error, "Aborted") children_pids = info[:children_pids] if children_pids and children_pids.any? Log.medium("Killing children: #{ children_pids * ", " }") children_pids.each do |pid| Log.medium("Killing child #{ pid }") begin Process.kill "INT", pid rescue Exception Log.medium("Exception killing child #{ pid }: #{$!.}") end end end raise $! rescue Exception backtrace = $!.backtrace # HACK: This fixes an strange behaviour in 1.9.3 where some # bactrace strings are coded in ASCII-8BIT backtrace.each{|l| l.force_encoding("UTF-8")} if String.instance_methods.include? :force_encoding set_info :backtrace, backtrace log(:error, "#{$!.class}: #{$!.}") log(:error, "backtrace: #{$!.backtrace.first}") raise $! end set_info :status, :done set_info :done, (done_time = Time.now) set_info :time_elapsed, done_time - start_time Log.medium("Completed task #{task.name || ""} [#{Process.pid}]: #{ path }") res end if no_load self else prepare_result result, @task.result_description, info end end |
#running? ⇒ Boolean
87 88 89 90 91 |
# File 'lib/rbbt/workflow/accessor.rb', line 87 def running? return nil if not Open.exists? info_file return nil if info[:pid].nil? return Misc.pid_exists? info[:pid] end |
#save_file(name, content) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/rbbt/workflow/accessor.rb', line 118 def save_file(name, content) content = case when String === content content when Array === content content * "\n" when TSV === content content.to_s when Hash === content content.collect{|*p| p * "\t"} * "\n" else content.to_s end Open.write(file(name), content) end |
#set_info(key, value) ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/rbbt/workflow/accessor.rb', line 40 def set_info(key, value) return nil if @exec value = Annotated.purge value if defined? Annotated Open.lock(info_file) do i = info i[key] = value Open.write(info_file, INFO_SERIALIAZER.dump(i)) value end end |
#started? ⇒ Boolean
79 80 81 |
# File 'lib/rbbt/workflow/accessor.rb', line 79 def started? Open.exists? info_file end |
#status ⇒ Object
51 52 53 |
# File 'lib/rbbt/workflow/accessor.rb', line 51 def status info[:status] end |
#status=(status) ⇒ Object
55 56 57 |
# File 'lib/rbbt/workflow/accessor.rb', line 55 def status=(status) set_info(:status, status) end |
#step(name) ⇒ Object
277 278 279 |
# File 'lib/rbbt/workflow/step.rb', line 277 def step(name) rec_dependencies.select{|step| step.task.name.to_sym == name.to_sym}.first end |
#task_name ⇒ Object
16 17 18 |
# File 'lib/rbbt/workflow/accessor.rb', line 16 def task_name @task_name ||= task.name end |