Class: OpenTox::Task
Overview
Class for handling asynchronous tasks
Instance Attribute Summary collapse
-
#due_to_time ⇒ Object
Returns the value of attribute due_to_time.
-
#http_code ⇒ Object
Returns the value of attribute http_code.
Attributes included from OpenTox
Class Method Summary collapse
-
.all(uri = CONFIG[:services]["opentox-task"]) ⇒ text/uri-list
Get a list of all tasks.
-
.create(title = nil, creator = nil, max_duration = DEFAULT_TASK_MAX_DURATION, description = nil) ⇒ OPenTox::Task
Create a new task for the code in the block.
-
.exist?(uri) ⇒ OpenTox::Task
Find a task for querying, status changes.
-
.find(uri) ⇒ OpenTox::Task
Find a task for querying, status changes.
- .from_rdfxml(rdfxml) ⇒ Object
- .from_yaml(yaml) ⇒ Object
Instance Method Summary collapse
-
#add_error_report(error_report) ⇒ Object
not stored just for to_rdf.
- #cancel ⇒ Object
- #completed(uri) ⇒ Object
- #completed? ⇒ Boolean
- #description ⇒ Object
- #error(error_report) ⇒ Object
- #error? ⇒ Boolean
- #errorReport ⇒ Object
-
#initialize(uri = nil) ⇒ Task
constructor
A new instance of Task.
- #load_metadata ⇒ Object
- #pid=(pid) ⇒ Object
-
#progress(pct) ⇒ Object
updates percentageCompleted value (can only be increased) task has to be running.
- #queued? ⇒ Boolean
- #result_uri ⇒ Object
- #running? ⇒ Boolean
- #status ⇒ Object
- #to_rdfxml ⇒ Object
-
#wait_for_completion(waiting_task = nil) ⇒ Object
waits for a task, unless time exceeds or state is no longer running.
- #waiting_for(task_uri) ⇒ Object
Methods included from OpenTox
#add_metadata, #delete, sign_in, text_to_html
Constructor Details
#initialize(uri = nil) ⇒ Task
Returns a new instance of Task.
9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/task.rb', line 9 def initialize(uri=nil) super uri @http_code = 202 @metadata = { DC.title => "", DC.date => "", OT.hasStatus => "Running", OT.percentageCompleted => 0.0, OT.resultURI => "", DC.creator => "", # not mandatory according to API DC.description => "", # not mandatory according to API } end |
Instance Attribute Details
#due_to_time ⇒ Object
Returns the value of attribute due_to_time.
7 8 9 |
# File 'lib/task.rb', line 7 def due_to_time @due_to_time end |
#http_code ⇒ Object
Returns the value of attribute http_code.
7 8 9 |
# File 'lib/task.rb', line 7 def http_code @http_code end |
Class Method Details
.all(uri = CONFIG[:services]["opentox-task"]) ⇒ text/uri-list
Get a list of all tasks
103 104 105 |
# File 'lib/task.rb', line 103 def self.all(uri=CONFIG[:services]["opentox-task"]) OpenTox.all uri end |
.create(title = nil, creator = nil, max_duration = DEFAULT_TASK_MAX_DURATION, description = nil) ⇒ OPenTox::Task
Create a new task for the code in the block. Catches halts and exceptions and sets task state to error if necessary. The block has to return the URI of the created resource.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/task.rb', line 34 def self.create( title=nil, creator=nil, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil ) params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description } task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, nil, false).to_s task = Task.new(task_uri.chomp) # measure current memory consumption =begin memory = `free -m|sed -n '2p'`.split free_memory = memory[3].to_i + memory[6].to_i # include cache if free_memory < 20 # require at least 200 M free memory LOGGER.warn "Cannot start task - not enough memory left (#{free_memory} M free)" task.cancel return task #raise "Insufficient memory to start a new task" end cpu_load = `cat /proc/loadavg`.split(/\s+/)[0..2].collect{|c| c.to_f} nr_cpu_cores = `cat /proc/cpuinfo |grep "cpu cores"|cut -d ":" -f2|tr -d " "`.split("\n").collect{|c| c.to_i}.inject{|sum,n| sum+n} nr_cpu_cores = 1 if !nr_cpu_cores #if cpu_load[0] > nr_cpu_cores and cpu_load[0] > cpu_load[1] and cpu_load[1] > cpu_load[2] # average CPU load of the last minute is high and CPU load is increasing # LOGGER.warn "Cannot start task - CPU load too high (#{cpu_load.join(", ")})" # task.cancel # return task # #raise "Server too busy to start a new task" #end =end task_pid = Spork.spork(:logger => LOGGER) do LOGGER.debug "Task #{task.uri} started #{Time.now}" begin result = yield task LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s task.completed(result) rescue => error LOGGER.error "task failed: "+error.class.to_s+": "+error. LOGGER.error ":\n"+error.backtrace.join("\n") task.error(OpenTox::ErrorReport.create(error, creator)) end end task.pid = task_pid LOGGER.debug "Started task: "+task.uri.to_s task end |
.exist?(uri) ⇒ OpenTox::Task
Find a task for querying, status changes
93 94 95 96 97 98 |
# File 'lib/task.rb', line 93 def self.exist?(uri) begin return find(uri) rescue end end |
.find(uri) ⇒ OpenTox::Task
Find a task for querying, status changes
82 83 84 85 86 87 88 |
# File 'lib/task.rb', line 82 def self.find(uri) return nil unless uri task = Task.new(uri) task. raise "could not load task metadata" if task.==nil or task..size==0 task end |
.from_rdfxml(rdfxml) ⇒ Object
111 112 113 114 115 116 |
# File 'lib/task.rb', line 111 def self.from_rdfxml(rdfxml) owl = OpenTox::Parser::Owl.from_rdf(rdfxml, OT.Task) task = Task.new(owl.uri) task.(owl.) task end |
.from_yaml(yaml) ⇒ Object
107 108 109 |
# File 'lib/task.rb', line 107 def self.from_yaml(yaml) @metadata = YAML.load(yaml) end |
Instance Method Details
#add_error_report(error_report) ⇒ Object
not stored just for to_rdf
159 160 161 162 |
# File 'lib/task.rb', line 159 def add_error_report( error_report ) raise "not an error report: "+error_report.class.to_s unless error_report.is_a?(ErrorReport) @error_report = error_report end |
#cancel ⇒ Object
142 143 144 145 |
# File 'lib/task.rb', line 142 def cancel RestClientWrapper.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"}) end |
#completed(uri) ⇒ Object
147 148 149 150 |
# File 'lib/task.rb', line 147 def completed(uri) RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri}) end |
#completed? ⇒ Boolean
176 177 178 |
# File 'lib/task.rb', line 176 def completed? @metadata[OT.hasStatus] == 'Completed' end |
#description ⇒ Object
134 135 136 |
# File 'lib/task.rb', line 134 def description @metadata[DC.description] end |
#error(error_report) ⇒ Object
152 153 154 155 156 |
# File 'lib/task.rb', line 152 def error(error_report) raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport) RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml}) end |
#error? ⇒ Boolean
180 181 182 |
# File 'lib/task.rb', line 180 def error? @metadata[OT.hasStatus] == 'Error' end |
#errorReport ⇒ Object
138 139 140 |
# File 'lib/task.rb', line 138 def errorReport @metadata[OT.errorReport] end |
#load_metadata ⇒ Object
184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/task.rb', line 184 def if (CONFIG[:json_hosts].include?(URI.parse(@uri).host)) result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false) @metadata = YAML.load result.to_s @http_code = result.code else @metadata = Parser::Owl::Generic.new(@uri). @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code end raise "could not load task metadata for task "+@uri.to_s if @metadata==nil || @metadata.size==0 end |
#pid=(pid) ⇒ Object
164 165 166 |
# File 'lib/task.rb', line 164 def pid=(pid) RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid}) end |
#progress(pct) ⇒ Object
updates percentageCompleted value (can only be increased) task has to be running
274 275 276 277 278 279 280 281 |
# File 'lib/task.rb', line 274 def progress(pct) #puts "task := "+pct.to_s raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100 if (pct > @metadata[OT.percentageCompleted] + 0.0001) RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct}) end end |
#queued? ⇒ Boolean
172 173 174 |
# File 'lib/task.rb', line 172 def queued? @metadata[OT.hasStatus] == 'Queued' end |
#result_uri ⇒ Object
130 131 132 |
# File 'lib/task.rb', line 130 def result_uri @metadata[OT.resultURI] end |
#running? ⇒ Boolean
168 169 170 |
# File 'lib/task.rb', line 168 def running? @metadata[OT.hasStatus] == 'Running' end |
#status ⇒ Object
126 127 128 |
# File 'lib/task.rb', line 126 def status @metadata[OT.hasStatus] end |
#to_rdfxml ⇒ Object
118 119 120 121 122 123 124 |
# File 'lib/task.rb', line 118 def to_rdfxml s = Serializer::Owl.new @metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report s.add_task(@uri,@metadata) s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report s.to_rdfxml end |
#wait_for_completion(waiting_task = nil) ⇒ Object
waits for a task, unless time exceeds or state is no longer running
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/task.rb', line 245 def wait_for_completion( waiting_task=nil) waiting_task.waiting_for(self.uri) if waiting_task due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION start_time = Time.new dur = 0 LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s # for extremely fast tasks check_state while self.running? or self.queued? sleep dur dur = [[(Time.new - start_time)/20.0,0.3].max,300.0].min #LOGGER.debug "task-object-id: #{self.object_id} - wait: #{"%.2f"%(Time.new - start_time)} - dur: #{"%.2f"%dur}" # if another (sub)task is waiting for self, set progress accordingly waiting_task.progress(@metadata[OT.percentageCompleted].to_f) if waiting_task check_state if (Time.new > due_to_time) raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" end end waiting_task.waiting_for(nil) if waiting_task LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s end |
#waiting_for(task_uri) ⇒ Object
283 284 285 |
# File 'lib/task.rb', line 283 def waiting_for(task_uri) RestClientWrapper.put(File.join(@uri,'Running'),{:waiting_for => task_uri}) end |