Module: SimpleWS::Jobs::Scheduler
- Includes:
- Process
- Defined in:
- lib/simplews/jobs.rb
Overview
{{{ Scheduler
Defined Under Namespace
Classes: Job
Constant Summary collapse
- @@task_results =
{}
- @@names =
[]
- @@pids =
{}
- @@queue =
[]
- @@max_jobs =
3
Class Method Summary collapse
- .abort(name) ⇒ Object
- .abort_jobs ⇒ Object
- .clean_job(pid) ⇒ Object
- .configure(name, value) ⇒ Object
- .dequeue ⇒ Object
- .helper(name, block) ⇒ Object
- .job_info(name) ⇒ Object
- .job_monitor ⇒ Object
- .job_results(name) ⇒ Object
- .make_name(name = "") ⇒ Object
- .random_name(s = "", num = 20) ⇒ Object
- .run(task, *args) ⇒ Object
- .task(name, results, block) ⇒ Object
- .workdir=(workdir) ⇒ Object
Class Method Details
.abort(name) ⇒ Object
140 141 142 |
# File 'lib/simplews/jobs.rb', line 140 def self.abort(name) Process.kill("INT", @@pids[name]) if @@pids[name] end |
.abort_jobs ⇒ Object
144 145 146 147 148 |
# File 'lib/simplews/jobs.rb', line 144 def self.abort_jobs @@pids.values{|pid| Process.kill "INT", pid } end |
.clean_job(pid) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/simplews/jobs.rb', line 109 def self.clean_job(pid) name = @@pids.select{|name, p| p == pid}.first[0] puts "Process #{ name } with pid #{ pid } finished with exitstatus #{$?.exitstatus}" state = Job.job_info(name) if ![:error, :done, :aborted].include?(state[:status]) state[:status] = :error state[:messages] << "Job finished for unknown reasons" Job.save(name, state) end @@pids.delete(name) end |
.configure(name, value) ⇒ Object
67 68 69 |
# File 'lib/simplews/jobs.rb', line 67 def self.configure(name, value) Job::configure(name, value) end |
.dequeue ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/simplews/jobs.rb', line 80 def self.dequeue if @@pids.length < @@max_jobs && @@queue.any? job_info = @@queue.pop pid = Job.new.run(job_info[:task], job_info[:name], @@task_results[job_info[:task]], *job_info[:args]) @@pids[job_info[:name]] = pid pid else nil end end |
.helper(name, block) ⇒ Object
71 72 73 |
# File 'lib/simplews/jobs.rb', line 71 def self.helper(name, block) Job.send :define_method, name, block end |
.job_info(name) ⇒ Object
150 151 152 |
# File 'lib/simplews/jobs.rb', line 150 def self.job_info(name) Job.job_info(name) end |
.job_monitor ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/simplews/jobs.rb', line 121 def self.job_monitor Thread.new{ while true pid = dequeue if pid Thread.new(pid){|pid| begin pid = Process.wait(-1,Process::WUNTRACED) if @@pids.any? clean_job(pid) if @@pids.values.include? pid rescue Exception puts $!. end } end sleep 2 end } end |
.job_results(name) ⇒ Object
158 159 160 |
# File 'lib/simplews/jobs.rb', line 158 def self.job_results(name) Job.results(name) end |
.make_name(name = "") ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/simplews/jobs.rb', line 44 def self.make_name(name = "") name = Scheduler::random_name("job-") unless name =~ /\w/ taken = @@names.select{|n| n =~ /^#{ Regexp.quote name }(-\d+)?$/} taken += Job.taken(name) taken = taken.sort.uniq if taken.any? if taken.length == 1 return name + '-2' else last = taken.collect{|s| if s.match(/-(\d+)$/) $1.to_i else 1 end }.sort.last return name + '-' + (last + 1).to_s end else return name end end |
.random_name(s = "", num = 20) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 |
# File 'lib/simplews/jobs.rb', line 30 def self.random_name(s="", num=20) num.times{ r = rand if r < 0.3 s << (rand * 10).to_i.to_s elsif r < 0.6 s << (rand * (?z - ?a) + ?a).to_i.chr else s << (rand * (?Z - ?A) + ?A).to_i.chr end } s.to_s end |
.run(task, *args) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/simplews/jobs.rb', line 92 def self.run(task, *args) suggested_name = *args.pop name = make_name(suggested_name) @@names << name @@queue.push( {:name => name, :task => task, :args => args}) state = { :name => name, :status => :queued, :messages => [], :info => {}, } Job.save(name,state) name end |
.task(name, results, block) ⇒ Object
75 76 77 78 |
# File 'lib/simplews/jobs.rb', line 75 def self.task(name, results, block) @@task_results[name] = results Job.send :define_method, name, block end |
.workdir=(workdir) ⇒ Object
154 155 156 |
# File 'lib/simplews/jobs.rb', line 154 def self.workdir=(workdir) Job.workdir = workdir end |