Class: OpenTox::Task

Inherits:
Object
  • Object
show all
Includes:
OpenTox
Defined in:
lib/task.rb

Overview

Class for handling asynchronous tasks

Instance Attribute Summary collapse

Attributes included from OpenTox

#metadata, #uri

Class Method Summary collapse

Instance Method Summary collapse

Methods included from OpenTox

#add_metadata, #delete, sign_in, text_to_html

Constructor Details

#initialize(uri = nil) ⇒ Task

Returns a new instance of Task.



9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/task.rb', line 9

def initialize(uri=nil)
  super uri
  @http_code = 202
  @metadata = {
    DC.title => "",
    DC.date => "",
    OT.hasStatus => "Running",
    OT.percentageCompleted => 0.0,
    OT.resultURI => "",
    DC.creator => "", # not mandatory according to API
    DC.description => "", # not mandatory according to API
  }
end

Instance Attribute Details

#due_to_timeObject

Returns the value of attribute due_to_time.



7
8
9
# File 'lib/task.rb', line 7

def due_to_time
  @due_to_time
end

#http_codeObject

Returns the value of attribute http_code.



7
8
9
# File 'lib/task.rb', line 7

def http_code
  @http_code
end

Class Method Details

.all(uri = CONFIG[:services]["opentox-task"]) ⇒ text/uri-list

Get a list of all tasks

Parameters:

  • uri (optional, String) (defaults to: CONFIG[:services]["opentox-task"])

    URI of task service

Returns:

  • (text/uri-list)

    Task URIs



103
104
105
# File 'lib/task.rb', line 103

def self.all(uri=CONFIG[:services]["opentox-task"])
  OpenTox.all uri
end

.create(title = nil, creator = nil, max_duration = DEFAULT_TASK_MAX_DURATION, description = nil) ⇒ OPenTox::Task

Create a new task for the code in the block. Catches halts and exceptions and sets task state to error if necessary. The block has to return the URI of the created resource.

Examples:

task = OpenTox::Task.create do
  # this code will be executed as a task
  model = OpenTox::Algorithm.run(params) # this can be time consuming
  model.uri # Important: return URI of the created resource
end
task.status # returns "Running", because tasks are forked

Parameters:

  • title (String) (defaults to: nil)

    Task title

  • creator (String) (defaults to: nil)

    Task creator

Returns:

  • (OPenTox::Task)

    Task



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/task.rb', line 34

def self.create( title=nil, creator=nil, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil )
  
  params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description }
  task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, nil, false).to_s
  task = Task.new(task_uri.chomp)

  # measure current memory consumption
=begin
  memory = `free -m|sed -n '2p'`.split
  free_memory = memory[3].to_i + memory[6].to_i # include cache
  if free_memory < 20 # require at least 200 M free memory
    LOGGER.warn "Cannot start task  - not enough memory left (#{free_memory} M free)"
    task.cancel
    return task
    #raise "Insufficient memory to start a new task"
  end

  cpu_load = `cat /proc/loadavg`.split(/\s+/)[0..2].collect{|c| c.to_f}
  nr_cpu_cores = `cat /proc/cpuinfo |grep "cpu cores"|cut -d ":" -f2|tr -d " "`.split("\n").collect{|c| c.to_i}.inject{|sum,n| sum+n}
  nr_cpu_cores = 1 if !nr_cpu_cores
  #if cpu_load[0] > nr_cpu_cores and cpu_load[0] > cpu_load[1] and cpu_load[1] > cpu_load[2] # average CPU load of the last minute is high and CPU load is increasing
  #  LOGGER.warn "Cannot start task  - CPU load too high (#{cpu_load.join(", ")})"
  #  task.cancel
  #  return task
  #  #raise "Server too busy to start a new task"
  #end
=end

  task_pid = Spork.spork(:logger => LOGGER) do
    LOGGER.debug "Task #{task.uri} started #{Time.now}"
    begin
      result = yield task
      LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
      task.completed(result)
    rescue => error
      LOGGER.error "task failed: "+error.class.to_s+": "+error.message
      LOGGER.error ":\n"+error.backtrace.join("\n")
      task.error(OpenTox::ErrorReport.create(error, creator))
    end
  end  
  task.pid = task_pid
  LOGGER.debug "Started task: "+task.uri.to_s
  task
end

.exist?(uri) ⇒ OpenTox::Task

Find a task for querying, status changes

Parameters:

Returns:



93
94
95
96
97
98
# File 'lib/task.rb', line 93

def self.exist?(uri)
  begin
    return find(uri)
  rescue
  end 
end

.find(uri) ⇒ OpenTox::Task

Find a task for querying, status changes

Parameters:

Returns:



82
83
84
85
86
87
88
# File 'lib/task.rb', line 82

def self.find(uri)
  return nil unless uri
  task = Task.new(uri)
  task.
  raise "could not load task metadata" if task.==nil or task..size==0
  task
end

.from_rdfxml(rdfxml) ⇒ Object



111
112
113
114
115
116
# File 'lib/task.rb', line 111

def self.from_rdfxml(rdfxml)
  owl = OpenTox::Parser::Owl.from_rdf(rdfxml, OT.Task)
  task = Task.new(owl.uri)
  task.(owl.)
  task
end

.from_yaml(yaml) ⇒ Object



107
108
109
# File 'lib/task.rb', line 107

def self.from_yaml(yaml)
  @metadata = YAML.load(yaml)
end

Instance Method Details

#add_error_report(error_report) ⇒ Object

not stored just for to_rdf



159
160
161
162
# File 'lib/task.rb', line 159

def add_error_report( error_report )
  raise "not an error report: "+error_report.class.to_s unless error_report.is_a?(ErrorReport)
  @error_report = error_report
end

#cancelObject



142
143
144
145
# File 'lib/task.rb', line 142

def cancel
  RestClientWrapper.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"})
  
end

#completed(uri) ⇒ Object



147
148
149
150
# File 'lib/task.rb', line 147

def completed(uri)
  RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
  
end

#completed?Boolean

Returns:

  • (Boolean)


176
177
178
# File 'lib/task.rb', line 176

def completed?
  @metadata[OT.hasStatus] == 'Completed'
end

#descriptionObject



134
135
136
# File 'lib/task.rb', line 134

def description
  @metadata[DC.description]
end

#error(error_report) ⇒ Object



152
153
154
155
156
# File 'lib/task.rb', line 152

def error(error_report)
  raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport)
  RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml})
  
end

#error?Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/task.rb', line 180

def error?
  @metadata[OT.hasStatus] == 'Error'
end

#errorReportObject



138
139
140
# File 'lib/task.rb', line 138

def errorReport
  @metadata[OT.errorReport]
end

#load_metadataObject



184
185
186
187
188
189
190
191
192
193
194
# File 'lib/task.rb', line 184

def 
  if (CONFIG[:json_hosts].include?(URI.parse(@uri).host))
    result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false)
    @metadata = YAML.load result.to_s
    @http_code = result.code
  else
    @metadata = Parser::Owl::Generic.new(@uri).
    @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code
  end
  raise "could not load task metadata for task "+@uri.to_s if @metadata==nil || @metadata.size==0
end

#pid=(pid) ⇒ Object



164
165
166
# File 'lib/task.rb', line 164

def pid=(pid)
  RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
end

#progress(pct) ⇒ Object

updates percentageCompleted value (can only be increased) task has to be running

Parameters:

  • pct (Numeric)

    value between 0 and 100



274
275
276
277
278
279
280
281
# File 'lib/task.rb', line 274

def progress(pct)
  #puts "task := "+pct.to_s
  raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
  if (pct > @metadata[OT.percentageCompleted] + 0.0001)
    RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct})
    
  end
end

#queued?Boolean

Returns:

  • (Boolean)


172
173
174
# File 'lib/task.rb', line 172

def queued?
  @metadata[OT.hasStatus] == 'Queued'
end

#result_uriObject



130
131
132
# File 'lib/task.rb', line 130

def result_uri
  @metadata[OT.resultURI]
end

#running?Boolean

Returns:

  • (Boolean)


168
169
170
# File 'lib/task.rb', line 168

def running?
  @metadata[OT.hasStatus] == 'Running'
end

#statusObject



126
127
128
# File 'lib/task.rb', line 126

def status
  @metadata[OT.hasStatus]
end

#to_rdfxmlObject



118
119
120
121
122
123
124
# File 'lib/task.rb', line 118

def to_rdfxml
  s = Serializer::Owl.new
  @metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report
  s.add_task(@uri,@metadata)
  s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report
  s.to_rdfxml
end

#wait_for_completion(waiting_task = nil) ⇒ Object

waits for a task, unless time exceeds or state is no longer running

Parameters:

  • waiting_task (optional, OpenTox::Task) (defaults to: nil)

    (can be a OpenTox::Subtask as well), progress is updated accordingly

  • dur (optional, Numeric)

    seconds pausing before cheking again for completion



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/task.rb', line 245

def wait_for_completion( waiting_task=nil)
  
  waiting_task.waiting_for(self.uri) if waiting_task
  due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
  start_time = Time.new
  dur = 0
  LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
  
   # for extremely fast tasks
  check_state
  while self.running? or self.queued?
    sleep dur
    dur = [[(Time.new - start_time)/20.0,0.3].max,300.0].min
    #LOGGER.debug "task-object-id: #{self.object_id} - wait: #{"%.2f"%(Time.new - start_time)} - dur: #{"%.2f"%dur}"
     
    # if another (sub)task is waiting for self, set progress accordingly 
    waiting_task.progress(@metadata[OT.percentageCompleted].to_f) if waiting_task
    check_state
    if (Time.new > due_to_time)
      raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'"
    end
  end
  waiting_task.waiting_for(nil) if waiting_task
  LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s
end

#waiting_for(task_uri) ⇒ Object



283
284
285
# File 'lib/task.rb', line 283

def waiting_for(task_uri)
  RestClientWrapper.put(File.join(@uri,'Running'),{:waiting_for => task_uri})
end