Class: SimpleWS::Jobs::Scheduler::Job
- Inherits:
-
Object
- Object
- SimpleWS::Jobs::Scheduler::Job
- Defined in:
- lib/simplews/jobs.rb,
lib/simplews/rake.rb
Overview
{{{ Job
Constant Summary collapse
- @@config =
{}
Class Method Summary collapse
- .configure(name, value) ⇒ Object
- .job_info(name) ⇒ Object
- .path(file, name) ⇒ Object
- .results(name) ⇒ Object
- .save(name, state) ⇒ Object
- .taken(name = "") ⇒ Object
- .workdir=(workdir) ⇒ Object
Instance Method Summary collapse
- #abort ⇒ Object
-
#add_description(reg_exp, step, message) ⇒ Object
Add step information to rule tasks, as the ‘desc’ method cannot be used to describe them for the time being.
- #config ⇒ Object
- #error(message = nil) ⇒ Object
- #info(info = {}) ⇒ Object
- #job_name ⇒ Object
- #message(message) ⇒ Object
- #path(file) ⇒ Object
-
#rake(rakefile = "Rakefile", target = nil) ⇒ Object
Instruct rake to load the rakefile, named Rakefile by default, and use it to produce the file specified first as product of the web service task.
- #result_filenames ⇒ Object
- #results(results) ⇒ Object
- #run(task, name, results, *args) ⇒ Object
- #save ⇒ Object
- #step(status, message = nil) ⇒ Object
- #workdir ⇒ Object
- #write(file, contents) ⇒ Object
Class Method Details
.configure(name, value) ⇒ Object
253 254 255 |
# File 'lib/simplews/jobs.rb', line 253 def self.configure(name, value) @@config[name] = value end |
.job_info(name) ⇒ Object
219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/simplews/jobs.rb', line 219 def self.job_info(name) info = nil retries = 2 begin info = Marshal::load(File.open(File.join(@@savedir,name + '.marshal'))) raise Exception unless info.is_a?(Hash) && info[:info] rescue Exception if retries > 0 retries -= 1 sleep SimpleWS::Jobs::SLEEP_TIMES[:job_info] retry end info = nil end raise JobNotFound, "Job with name '#{ name }' was not found" if info.nil? if info[:queued] && !@@queue.collect{|info| info[:name]}.include?(name) FileUtils.rm(File.join(@@savedir, name + '.marshal')) raise Aborted, "Job #{ name } has been removed from the queue" end info end |
.path(file, name) ⇒ Object
205 206 207 208 209 210 211 |
# File 'lib/simplews/jobs.rb', line 205 def self.path(file, name) if file =~ /^\/|#{@@workdir}/ file.gsub(/\{JOB\}/, name) else File.join(@@workdir, file.gsub(/\{JOB\}/,name)) end end |
.results(name) ⇒ Object
245 246 247 248 249 250 |
# File 'lib/simplews/jobs.rb', line 245 def self.results(name) job_info(name)[:results].collect{|file| code = Scheduler.random_name("res-") [code, file] } end |
.save(name, state) ⇒ Object
213 214 215 216 217 |
# File 'lib/simplews/jobs.rb', line 213 def self.save(name, state) fout = File.open(File.join(@@savedir,name + '.marshal'),'w') fout.write Marshal::dump(state) fout.close end |
.taken(name = "") ⇒ Object
201 202 203 204 |
# File 'lib/simplews/jobs.rb', line 201 def self.taken(name = "") Dir.glob(@@savedir + "/#{ name }*.marshal"). collect{|n| n.match(/\/(#{ Regexp.quote name }(?:-\d+)?).marshal/); $1}.compact end |
.workdir=(workdir) ⇒ Object
194 195 196 197 198 199 |
# File 'lib/simplews/jobs.rb', line 194 def self.workdir=(workdir) @@workdir = workdir @@savedir = File.join(@@workdir, '.save') FileUtils.mkdir_p @@workdir unless File.exist? @@workdir FileUtils.mkdir_p @@savedir unless File.exist? @@savedir end |
Instance Method Details
#abort ⇒ Object
312 313 314 315 |
# File 'lib/simplews/jobs.rb', line 312 def abort raise SimpleWS::Jobs::Aborted save end |
#add_description(reg_exp, step, message) ⇒ Object
Add step information to rule tasks, as the ‘desc’ method cannot be used to describe them for the time being.
11 12 13 14 |
# File 'lib/simplews/rake.rb', line 11 def add_description(reg_exp, step, ) @step_descriptions ||= {} @step_descriptions[Regexp.new(reg_exp)] = "#{ step }: #{ }" end |
#config ⇒ Object
261 262 263 |
# File 'lib/simplews/jobs.rb', line 261 def config @@config end |
#error(message = nil) ⇒ Object
292 293 294 295 |
# File 'lib/simplews/jobs.rb', line 292 def error( = nil) step(:error, ) save end |
#info(info = {}) ⇒ Object
297 298 299 300 301 |
# File 'lib/simplews/jobs.rb', line 297 def info(info = {}) @state[:info].merge!(info) save @state[:info] end |
#job_name ⇒ Object
317 318 319 |
# File 'lib/simplews/jobs.rb', line 317 def job_name @name end |
#message(message) ⇒ Object
282 283 284 285 |
# File 'lib/simplews/jobs.rb', line 282 def () @state[:messages] << save end |
#path(file) ⇒ Object
265 266 267 |
# File 'lib/simplews/jobs.rb', line 265 def path(file) Job.path(file, @name) end |
#rake(rakefile = "Rakefile", target = nil) ⇒ Object
Instruct rake to load the rakefile, named Rakefile by default, and use it to produce the file specified first as product of the web service task. The ‘execute’ method of the Rake::Tasks class method execute is monkey-patched to log the steps. Since this is executed on a new process, there should be no side-effects from the patching.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/simplews/rake.rb', line 21 def rake(rakefile = "Rakefile", target = nil) Rake::Task.class_eval <<-'EOC' alias_method :old_execute, :execute def execute(*args) action = name message = $_step_descriptions.collect{|rexp, msg| if name.match(rexp) msg else nil end }.compact.first message ||= comment message ||= "Invoking #{name}" if message.match(/^(\w+): (.*)/) $_current_job.step($1, $2) else $_current_job.step(action, message) end old_execute(*args) end EOC load rakefile @@steps.each{|step| step_dirname = File.join(workdir, step.to_s) FileUtils.mkdir_p step_dirname unless File.exists? step_dirname } if defined? Rake::Pipeline Rake::Pipeline::step_descriptions.each{|re, description| if description.match(/(.*): (.*)/) add_description(re, $1, $2) end } end files = result_filenames target ||= files.first $_current_job = self $_step_descriptions = @step_descriptions || {} Rake::Task[target].invoke end |
#result_filenames ⇒ Object
308 309 310 |
# File 'lib/simplews/jobs.rb', line 308 def result_filenames @state[:results] end |
#results(results) ⇒ Object
303 304 305 306 |
# File 'lib/simplews/jobs.rb', line 303 def results(results) @state[:results] = results.collect{|file| path(file)} save end |
#run(task, name, results, *args) ⇒ Object
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/simplews/jobs.rb', line 321 def run(task, name, results, *args) @name = name @state = { :name => @name, :status => :prepared, :messages => [], :info => {}, :results => results.collect{|file| path(file)}, } save @pid = Process.fork do begin puts "Job #{@name} starting with pid #{Process.pid}" trap(:INT) { raise SimpleWS::Jobs::Aborted } self.send task, *args step :done exit(0) rescue SimpleWS::Jobs::Aborted step(:aborted, "Job Aborted") exit(-1) rescue Exception if !$!.kind_of? SystemExit error($!.) puts "Error in job #{ @name }" puts $!. puts $!.backtrace exit(-1) else exit($!.status) end end end @pid end |
#save ⇒ Object
269 270 271 |
# File 'lib/simplews/jobs.rb', line 269 def save Job.save(@name, @state) end |
#step(status, message = nil) ⇒ Object
286 287 288 289 290 |
# File 'lib/simplews/jobs.rb', line 286 def step(status, = nil) @state[:status] = status @state[:messages] << if && != "" save end |
#workdir ⇒ Object
257 258 259 |
# File 'lib/simplews/jobs.rb', line 257 def workdir @@workdir end |
#write(file, contents) ⇒ Object
273 274 275 276 277 278 279 280 |
# File 'lib/simplews/jobs.rb', line 273 def write(file, contents) path = Job.path(file, @name) directory = File.dirname(File.(path)) FileUtils.mkdir_p directory unless File.exists? directory File.open(path,'w') do |fout| fout.write contents end end |