Module: SimpleWS::Jobs::Scheduler
- Includes:
- Process
- Defined in:
- lib/simplews/jobs.rb
Overview
{{{ Scheduler
Defined Under Namespace
Classes: Job
Constant Summary collapse
- @@task_results =
{}
- @@names =
[]
- @@pids =
{}
- @@queue =
[]
- @@max_jobs =
3
Class Method Summary collapse
- .abort(name) ⇒ Object
- .abort_jobs ⇒ Object
- .clean_job(pid) ⇒ Object
- .configure(name, value) ⇒ Object
- .dequeue ⇒ Object
- .helper(name, block) ⇒ Object
- .job_info(name) ⇒ Object
- .job_monitor ⇒ Object
- .job_results(name) ⇒ Object
- .make_name(name = "") ⇒ Object
- .queue ⇒ Object
- .random_name(s = "", num = 20) ⇒ Object
- .run(task, *args) ⇒ Object
- .task(name, results, block) ⇒ Object
- .workdir=(workdir) ⇒ Object
Class Method Details
.abort(name) ⇒ Object
162 163 164 |
# File 'lib/simplews/jobs.rb', line 162 def self.abort(name) Process.kill("INT", @@pids[name]) if @@pids[name] end |
.abort_jobs ⇒ Object
166 167 168 169 170 |
# File 'lib/simplews/jobs.rb', line 166 def self.abort_jobs @@pids.values{|pid| Process.kill "INT", pid } end |
.clean_job(pid) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/simplews/jobs.rb', line 120 def self.clean_job(pid) name = @@pids.select{|name, p| p == pid}.first return if name.nil? name = name.first puts "Process #{ name } with pid #{ pid } finished with exitstatus #{$?.exitstatus}" state = Job.job_info(name) if ![:error, :done, :aborted].include?(state[:status]) state[:status] = :error state[:messages] << "Job finished for unknown reasons" Job.save(name, state) end @@pids.delete(name) end |
.configure(name, value) ⇒ Object
73 74 75 |
# File 'lib/simplews/jobs.rb', line 73 def self.configure(name, value) Job::configure(name, value) end |
.dequeue ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/simplews/jobs.rb', line 86 def self.dequeue if @@pids.length <= @@max_jobs && @@queue.any? job_info = @@queue.pop pid = Job.new.run(job_info[:task], job_info[:name], @@task_results[job_info[:task]], *job_info[:args]) @@pids[job_info[:name]] = pid pid else nil end end |
.helper(name, block) ⇒ Object
77 78 79 |
# File 'lib/simplews/jobs.rb', line 77 def self.helper(name, block) Job.send :define_method, name, block end |
.job_info(name) ⇒ Object
172 173 174 |
# File 'lib/simplews/jobs.rb', line 172 def self.job_info(name) Job.job_info(name) end |
.job_monitor ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/simplews/jobs.rb', line 134 def self.job_monitor Thread.new{ while true begin pid = dequeue if pid.nil? if @@pids.any? pid_exit = Process.wait(-1, Process::WNOHANG) if pid_exit clean_job(pid_exit) else sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor] end else sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor] end else sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor] end rescue puts $!. puts $!.backtrace.join("\n") sleep 2 end end } end |
.job_results(name) ⇒ Object
180 181 182 |
# File 'lib/simplews/jobs.rb', line 180 def self.job_results(name) Job.results(name) end |
.make_name(name = "") ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/simplews/jobs.rb', line 49 def self.make_name(name = "") name = Scheduler::random_name("job-") unless name =~ /\w/ taken = @@names.select{|n| n =~ /^#{ Regexp.quote name }(?:-\d+)?$/} taken += Job.taken(name) taken = taken.sort.uniq if taken.any? if taken.length == 1 return name + '-2' else last = taken.collect{|s| if s.match(/-(\d+)$/) $1.to_i else 1 end }.sort.last return name + '-' + (last + 1).to_s end else return name end end |
.queue ⇒ Object
99 100 101 |
# File 'lib/simplews/jobs.rb', line 99 def self.queue @@queue end |
.random_name(s = "", num = 20) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/simplews/jobs.rb', line 35 def self.random_name(s="", num=20) num.times{ r = rand if r < 0.3 s << (rand * 10).to_i.to_s elsif r < 0.6 s << (rand * (?z - ?a) + ?a).to_i.chr else s << (rand * (?Z - ?A) + ?A).to_i.chr end } s.to_s end |
.run(task, *args) ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/simplews/jobs.rb', line 103 def self.run(task, *args) suggested_name = *args.pop name = make_name(suggested_name) @@names << name @@queue.push( {:name => name, :task => task, :args => args}) state = { :name => name, :status => :queued, :messages => [], :info => {}, } Job.save(name,state) name end |
.task(name, results, block) ⇒ Object
81 82 83 84 |
# File 'lib/simplews/jobs.rb', line 81 def self.task(name, results, block) @@task_results[name] = results Job.send :define_method, name, block end |