Module: SimpleWS::Jobs::Scheduler

Includes:
Process
Defined in:
lib/simplews/jobs.rb

Overview

{{{ Scheduler

Defined Under Namespace

Classes: Job

Constant Summary collapse

@@task_results =
{}
@@names =
[]
@@pids =
{}
@@queue =
[]
@@max_jobs =
3

Class Method Summary collapse

Class Method Details

.abort(name) ⇒ Object



162
163
164
# File 'lib/simplews/jobs.rb', line 162

def self.abort(name)
  Process.kill("INT", @@pids[name]) if @@pids[name]
end

.abort_jobsObject



166
167
168
169
170
# File 'lib/simplews/jobs.rb', line 166

def self.abort_jobs
  @@pids.values{|pid|
    Process.kill "INT", pid
  }
end

.clean_job(pid) ⇒ Object



120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/simplews/jobs.rb', line 120

def self.clean_job(pid)
  name = @@pids.select{|name, p| p == pid}.first
  return if name.nil?
  name = name.first
  puts "Process #{ name } with pid #{ pid } finished with exitstatus #{$?.exitstatus}"
  state = Job.job_info(name)
  if ![:error, :done, :aborted].include?(state[:status])
    state[:status] = :error
    state[:messages] << "Job finished for unknown reasons"
    Job.save(name, state)
  end
  @@pids.delete(name)
end

.configure(name, value) ⇒ Object



73
74
75
# File 'lib/simplews/jobs.rb', line 73

def self.configure(name, value)
  Job::configure(name, value)
end

.dequeueObject



86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/simplews/jobs.rb', line 86

def self.dequeue
  if @@pids.length <= @@max_jobs && @@queue.any?
    job_info = @@queue.pop

    pid = Job.new.run(job_info[:task], job_info[:name], @@task_results[job_info[:task]], *job_info[:args])
    
    @@pids[job_info[:name]] = pid
    pid
  else
    nil
  end
end

.helper(name, block) ⇒ Object



77
78
79
# File 'lib/simplews/jobs.rb', line 77

def self.helper(name, block)
  Job.send :define_method, name, block
end

.job_info(name) ⇒ Object



172
173
174
# File 'lib/simplews/jobs.rb', line 172

def self.job_info(name)
  Job.job_info(name)
end

.job_monitorObject



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/simplews/jobs.rb', line 134

def self.job_monitor
  Thread.new{
    while true
      begin
        pid = dequeue
        if pid.nil?
          if @@pids.any?
            pid_exit = Process.wait(-1, Process::WNOHANG)
            if pid_exit
              clean_job(pid_exit)
            else
              sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor]
            end
          else
            sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor]
          end
        else
          sleep SimpleWS::Jobs::SLEEP_TIMES[:monitor]
        end
      rescue
        puts $!.message
        puts $!.backtrace.join("\n")
        sleep 2
      end
    end
  }
end

.job_results(name) ⇒ Object



180
181
182
# File 'lib/simplews/jobs.rb', line 180

def self.job_results(name)
  Job.results(name)   
end

.make_name(name = "") ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/simplews/jobs.rb', line 49

def self.make_name(name = "")
  name = Scheduler::random_name("job-") unless name =~ /\w/

  taken = @@names.select{|n| n =~ /^#{ Regexp.quote name }(?:-\d+)?$/}
  taken += Job.taken(name)
  taken = taken.sort.uniq
  if taken.any?
    if taken.length == 1
      return name + '-2'
    else
      last = taken.collect{|s| 
        if s.match(/-(\d+)$/)
          $1.to_i 
        else 
          1 
        end
      }.sort.last
      return name + '-' + (last + 1).to_s
    end
  else
    return name
  end
end

.queueObject



99
100
101
# File 'lib/simplews/jobs.rb', line 99

def self.queue
  @@queue
end

.random_name(s = "", num = 20) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/simplews/jobs.rb', line 35

def self.random_name(s="", num=20)
  num.times{
    r = rand
    if r < 0.3
      s << (rand * 10).to_i.to_s
    elsif r < 0.6
      s << (rand * (?z - ?a) + ?a).to_i.chr
    else 
      s << (rand * (?Z - ?A) + ?A).to_i.chr
    end
  }
  s.to_s
end

.run(task, *args) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/simplews/jobs.rb', line 103

def self.run(task, *args)
  suggested_name = *args.pop
  name = make_name(suggested_name)
  @@names << name

  @@queue.push( {:name => name, :task => task, :args => args})
  state = {
      :name => name, 
      :status => :queued, 
      :messages => [], 
      :info => {}, 
  }
  Job.save(name,state)
 
  name
end

.task(name, results, block) ⇒ Object



81
82
83
84
# File 'lib/simplews/jobs.rb', line 81

def self.task(name, results, block)
  @@task_results[name] = results
  Job.send :define_method, name, block
end

.workdir=(workdir) ⇒ Object



176
177
178
# File 'lib/simplews/jobs.rb', line 176

def self.workdir=(workdir)
  Job.workdir = workdir
end