Module: SimpleWS::Jobs::Scheduler
- Includes:
- Process
- Defined in:
- lib/simplews/jobs.rb
Overview
{{{ Scheduler
Defined Under Namespace
Classes: Job
Constant Summary collapse
- @@task_results =
{}
- @@names =
[]
- @@pids =
{}
- @@queue =
[]
- @@max_jobs =
3
Class Method Summary collapse
- .abort(name) ⇒ Object
- .abort_jobs ⇒ Object
- .clean_job(pid) ⇒ Object
- .configure(name, value) ⇒ Object
- .dequeue ⇒ Object
- .helper(name, block) ⇒ Object
- .job_info(name) ⇒ Object
- .job_monitor ⇒ Object
- .job_results(name) ⇒ Object
- .make_name(name = "") ⇒ Object
- .queue ⇒ Object
- .queue_size(size) ⇒ Object
- .random_name(s = "", num = 20) ⇒ Object
- .run(task, *args) ⇒ Object
- .task(name, results, block) ⇒ Object
- .workdir=(workdir) ⇒ Object
Class Method Details
.abort(name) ⇒ Object
169 170 171 |
# File 'lib/simplews/jobs.rb', line 169 def self.abort(name) Process.kill("INT", @@pids[name]) if @@pids[name] end |
.abort_jobs ⇒ Object
173 174 175 176 177 |
# File 'lib/simplews/jobs.rb', line 173 def self.abort_jobs @@pids.values{|pid| Process.kill "INT", pid } end |
.clean_job(pid) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/simplews/jobs.rb', line 127 def self.clean_job(pid) name = @@pids.select{|name, p| p == pid}.first return if name.nil? name = name.first puts "Job #{ name } with pid #{ pid } finished with exitstatus #{$?.exitstatus}" state = Job.job_info(name) if ![:error, :done, :aborted].include?(state[:status]) state[:status] = :error state[:messages] << "Job finished for unknown reasons" Job.save(name, state) end @@pids.delete(name) end |
.configure(name, value) ⇒ Object
80 81 82 |
# File 'lib/simplews/jobs.rb', line 80 def self.configure(name, value) Job::configure(name, value) end |
.dequeue ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/simplews/jobs.rb', line 93 def self.dequeue if @@pids.length < @@max_jobs && @@queue.any? job_info = @@queue.pop pid = Job.new.run(job_info[:task], job_info[:name], @@task_results[job_info[:task]], *job_info[:args]) @@pids[job_info[:name]] = pid pid else nil end end |
.helper(name, block) ⇒ Object
84 85 86 |
# File 'lib/simplews/jobs.rb', line 84 def self.helper(name, block) Job.send :define_method, name, block end |
.job_info(name) ⇒ Object
179 180 181 |
# File 'lib/simplews/jobs.rb', line 179 def self.job_info(name) Job.job_info(name) end |
.job_monitor ⇒ Object
141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
# File 'lib/simplews/jobs.rb', line 141 def self.job_monitor Thread.new{ while true begin pid = dequeue if pid.nil? if @@pids.any? pid_exit = Process.wait(-1, Process::WNOHANG) if pid_exit clean_job(pid_exit) else sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor] end else sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor] end else sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor] end rescue puts $!. puts $!.backtrace.join("\n") sleep 2 end end } end |
.job_results(name) ⇒ Object
187 188 189 |
# File 'lib/simplews/jobs.rb', line 187 def self.job_results(name) Job.results(name) end |
.make_name(name = "") ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/simplews/jobs.rb', line 56 def self.make_name(name = "") name = Scheduler::random_name("job-") unless name =~ /\w/ taken = @@names.select{|n| n =~ /^#{ Regexp.quote name }(?:-\d+)?$/} taken += Job.taken(name) taken = taken.sort.uniq if taken.any? if taken.length == 1 return name + '-2' else last = taken.collect{|s| if s.match(/-(\d+)$/) $1.to_i else 1 end }.sort.last return name + '-' + (last + 1).to_s end else return name end end |
.queue ⇒ Object
106 107 108 |
# File 'lib/simplews/jobs.rb', line 106 def self.queue @@queue end |
.queue_size(size) ⇒ Object
37 38 39 |
# File 'lib/simplews/jobs.rb', line 37 def self.queue_size(size) @@max_jobs = size end |
.random_name(s = "", num = 20) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/simplews/jobs.rb', line 42 def self.random_name(s="", num=20) num.times{ r = rand if r < 0.3 s << (rand * 10).to_i.to_s elsif r < 0.6 s << (rand * (?z - ?a) + ?a).to_i.chr else s << (rand * (?Z - ?A) + ?A).to_i.chr end } s.to_s end |
.run(task, *args) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/simplews/jobs.rb', line 110 def self.run(task, *args) suggested_name = *args.pop name = make_name(suggested_name) @@names << name @@queue.push( {:name => name, :task => task, :args => args}) state = { :name => name, :status => :queued, :messages => [], :info => {}, } Job.save(name,state) name end |
.task(name, results, block) ⇒ Object
88 89 90 91 |
# File 'lib/simplews/jobs.rb', line 88 def self.task(name, results, block) @@task_results[name] = results Job.send :define_method, name, block end |