Module: SimpleWS::Jobs::Scheduler

Includes:
Process
Defined in:
lib/simplews/jobs.rb

Overview

{{{ Scheduler

Defined Under Namespace

Classes: Job

Constant Summary collapse

@@task_results =
{}
@@names =
[]
@@pids =
{}
@@queue =
[]
@@max_jobs =
3

Class Method Summary collapse

Class Method Details

.abort(name) ⇒ Object



140
141
142
# File 'lib/simplews/jobs.rb', line 140

def self.abort(name)
  Process.kill("INT", @@pids[name]) if @@pids[name]
end

.abort_jobsObject



144
145
146
147
148
# File 'lib/simplews/jobs.rb', line 144

def self.abort_jobs
  @@pids.values{|pid|
    Process.kill "INT", pid
  }
end

.clean_job(pid) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/simplews/jobs.rb', line 109

def self.clean_job(pid)
  name = @@pids.select{|name, p| p == pid}.first[0]
  puts "Process #{ name } with pid #{ pid } finished with exitstatus #{$?.exitstatus}"
  state = Job.job_info(name)
  if ![:error, :done, :aborted].include?(state[:status])
    state[:status] = :error
    state[:messages] << "Job finished for unknown reasons"
    Job.save(name, state)
  end
  @@pids.delete(name)
end

.configure(name, value) ⇒ Object



67
68
69
# File 'lib/simplews/jobs.rb', line 67

def self.configure(name, value)
  Job::configure(name, value)
end

.dequeueObject



80
81
82
83
84
85
86
87
88
89
90
# File 'lib/simplews/jobs.rb', line 80

def self.dequeue
  if @@pids.length < @@max_jobs && @@queue.any?
    job_info = @@queue.pop

    pid = Job.new.run(job_info[:task], job_info[:name], @@task_results[job_info[:task]], *job_info[:args])
    @@pids[job_info[:name]] = pid
    pid
  else
    nil
  end
end

.helper(name, block) ⇒ Object



71
72
73
# File 'lib/simplews/jobs.rb', line 71

def self.helper(name, block)
  Job.send :define_method, name, block
end

.job_info(name) ⇒ Object



150
151
152
# File 'lib/simplews/jobs.rb', line 150

def self.job_info(name)
  Job.job_info(name)
end

.job_monitorObject



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/simplews/jobs.rb', line 121

def self.job_monitor
  Thread.new{
    while true
      pid = dequeue
      if pid
        Thread.new(pid){|pid|
          begin
            pid = Process.wait(-1,Process::WUNTRACED) if @@pids.any?
            clean_job(pid) if @@pids.values.include? pid
          rescue Exception
            puts $!.message
          end
        }
      end
      sleep 2
    end
  }
end

.job_results(name) ⇒ Object



158
159
160
# File 'lib/simplews/jobs.rb', line 158

def self.job_results(name)
  Job.results(name)   
end

.make_name(name = "") ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/simplews/jobs.rb', line 44

def self.make_name(name = "")
  name = Scheduler::random_name("job-") unless name =~ /\w/

  taken = @@names.select{|n| n =~ /^#{ Regexp.quote name }(-\d+)?$/}
  taken += Job.taken(name)
  taken = taken.sort.uniq
  if taken.any?
    if taken.length == 1
      return name + '-2'
    else
      last = taken.collect{|s| 
        if s.match(/-(\d+)$/)
          $1.to_i 
        else 1 
        end
      }.sort.last
      return name + '-' + (last + 1).to_s
    end
  else
    return name
  end
end

.random_name(s = "", num = 20) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/simplews/jobs.rb', line 30

def self.random_name(s="", num=20)
  num.times{
    r = rand
    if r < 0.3
      s << (rand * 10).to_i.to_s
    elsif r < 0.6
      s << (rand * (?z - ?a) + ?a).to_i.chr
    else 
      s << (rand * (?Z - ?A) + ?A).to_i.chr
    end
  }
  s.to_s
end

.run(task, *args) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/simplews/jobs.rb', line 92

def self.run(task, *args)
  suggested_name = *args.pop
  name = make_name(suggested_name)
  @@names << name

  @@queue.push( {:name => name, :task => task, :args => args})
  state = {
      :name => name, 
      :status => :queued, 
      :messages => [], 
      :info => {}, 
  }
  Job.save(name,state)
 
  name
end

.task(name, results, block) ⇒ Object



75
76
77
78
# File 'lib/simplews/jobs.rb', line 75

def self.task(name, results, block)
  @@task_results[name] = results
  Job.send :define_method, name, block
end

.workdir=(workdir) ⇒ Object



154
155
156
# File 'lib/simplews/jobs.rb', line 154

def self.workdir=(workdir)
  Job.workdir = workdir
end