Class: MiGA::Daemon
Overview
MiGA Daemons handling job submissions.
Constant Summary
Constants included from MiGA
CITATION, VERSION, VERSION_DATE, VERSION_NAME
Instance Attribute Summary collapse
-
#jobs_running ⇒ Object
readonly
Array of jobs currently running.
-
#jobs_to_run ⇒ Object
readonly
Array of jobs next to be executed.
-
#loop_i ⇒ Object
readonly
Integer indicating the current iteration.
-
#options ⇒ Object
readonly
Options used to setup the daemon.
-
#project ⇒ Object
readonly
MiGA::Project in which the daemon is running.
Class Method Summary collapse
-
.last_alive(project) ⇒ Object
When was the last time a daemon for the MiGA::Project
projectwas seen active? Returns DateTime.
Instance Method Summary collapse
-
#check_datasets ⇒ Object
Traverse datasets.
-
#check_project ⇒ Object
Check if all reference datasets are pre-processed.
-
#daemon(task, opts = []) ⇒ Object
Launches the
taskwith optionsopts(as command-line arguments). -
#declare_alive ⇒ Object
Tell the world that you’re alive.
-
#default_options ⇒ Object
Returns Hash containing the default options for the daemon.
-
#flush! ⇒ Object
Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.
-
#get_job(job, ds = nil) ⇒ Object
Get the taks with key symbol
jobin datasetds. -
#in_loop ⇒ Object
Run one loop step.
-
#initialize(project) ⇒ Daemon
constructor
Initialize an unactive daemon for the MiGA::Project
project. -
#last_alive ⇒ Object
When was the last time a daemon for the current project was seen active? Returns DateTime.
-
#latency ⇒ Object
Returns Integer indicating the number of seconds to sleep between checks.
-
#maxjobs ⇒ Object
Returns Integer indicating the maximum number of concurrent jobs to run.
-
#ppn ⇒ Object
Returns Integer indicating the number of CPUs per job.
-
#purge! ⇒ Object
Remove dead jobs.
-
#queue_job(job, ds = nil) ⇒ Object
Add the task to the internal queue with symbol key
job. -
#restart(opts = []) ⇒ Object
Restarts the daemon with
opts. -
#runopts(k, v = nil, force = false) ⇒ Object
Set/get #options, where
kis the Symbol of the option andvis the value (or nil to use as getter). -
#say(*opts) ⇒ Object
Send a datestamped message to the log.
-
#shutdown_when_done? ⇒ Boolean
Returns Boolean indicating if the daemon should shutdown when processing is complete.
-
#start(opts = []) ⇒ Object
Initializes the daemon with
opts. -
#status(opts = []) ⇒ Object
Returns the status of the daemon with
opts. -
#stop(opts = []) ⇒ Object
Stops the daemon with
opts.
Methods inherited from MiGA
CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, clean_fasta_file, initialized?, #result_files_exist?, root_path, tabulate
Constructor Details
#initialize(project) ⇒ Daemon
Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon.
35 36 37 38 39 40 41 42 43 |
# File 'lib/miga/daemon.rb', line 35 def initialize(project) @project = project @runopts = JSON.parse( File.read(File.("daemon/daemon.json", project.path)), {:symbolize_names=>true}) @jobs_to_run = [] @jobs_running = [] @loop_i = -1 end |
Instance Attribute Details
#jobs_running ⇒ Object (readonly)
Array of jobs currently running.
28 29 30 |
# File 'lib/miga/daemon.rb', line 28 def jobs_running @jobs_running end |
#jobs_to_run ⇒ Object (readonly)
Array of jobs next to be executed.
26 27 28 |
# File 'lib/miga/daemon.rb', line 26 def jobs_to_run @jobs_to_run end |
#loop_i ⇒ Object (readonly)
Integer indicating the current iteration.
30 31 32 |
# File 'lib/miga/daemon.rb', line 30 def loop_i @loop_i end |
#options ⇒ Object (readonly)
Options used to setup the daemon.
24 25 26 |
# File 'lib/miga/daemon.rb', line 24 def end |
#project ⇒ Object (readonly)
MiGA::Project in which the daemon is running.
22 23 24 |
# File 'lib/miga/daemon.rb', line 22 def project @project end |
Class Method Details
.last_alive(project) ⇒ Object
When was the last time a daemon for the MiGA::Project project was seen active? Returns DateTime.
15 16 17 18 19 |
# File 'lib/miga/daemon.rb', line 15 def self.last_alive(project) f = File.("daemon/alive", project.path) return nil unless File.exist? f DateTime.parse(File.read(f)) end |
Instance Method Details
#check_datasets ⇒ Object
Traverse datasets
129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/miga/daemon.rb', line 129 def check_datasets project.each_dataset do |n, ds| if ds.nil? say "Warning: Dataset #{n} listed but not loaded, reloading project." project.load else to_run = ds.next_preprocessing(true) queue_job(to_run, ds) unless to_run.nil? end end end |
#check_project ⇒ Object
Check if all reference datasets are pre-processed. If yes, check the project-level tasks
144 145 146 147 148 149 150 151 |
# File 'lib/miga/daemon.rb', line 144 def check_project return if project.dataset_names.empty? if project.done_preprocessing?(false) to_run = project.next_distances(true) to_run = project.next_inclade(true) if to_run.nil? queue_job(to_run) unless to_run.nil? end end |
#daemon(task, opts = []) ⇒ Object
Launches the task with options opts (as command-line arguments). Supported tasks include: start, stop, restart, status.
110 111 112 113 114 115 116 117 |
# File 'lib/miga/daemon.rb', line 110 def daemon(task, opts=[]) = opts.unshift(task) [:ARGV] = opts Daemons.run_proc("MiGA:#{project.name}", ) do loop { break unless in_loop } end end |
#declare_alive ⇒ Object
Tell the world that you’re alive
121 122 123 124 125 |
# File 'lib/miga/daemon.rb', line 121 def declare_alive f = File.open(File.("daemon/alive", project.path), "w") f.print Time.now.to_s f.close end |
#default_options ⇒ Object
Returns Hash containing the default options for the daemon.
54 55 56 57 |
# File 'lib/miga/daemon.rb', line 54 def { dir_mode: :normal, dir: File.("daemon", project.path), multiple: false, log_output: true } end |
#flush! ⇒ Object
Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 |
# File 'lib/miga/daemon.rb', line 199 def flush! # Check for finished jobs @jobs_running.select! do |job| r = (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false) say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless r.nil? r.nil? end # Avoid single datasets hogging resources @jobs_to_run.rotate! rand(jobs_to_run.size) # Launch as many +jobs_to_run+ as possible while jobs_running.size < maxjobs break if jobs_to_run.empty? launch_job @jobs_to_run.shift end end |
#get_job(job, ds = nil) ⇒ Object
Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.
186 187 188 189 190 191 192 193 194 |
# File 'lib/miga/daemon.rb', line 186 def get_job(job, ds=nil) (jobs_to_run + jobs_running).find do |j| if ds==nil j[:ds].nil? and j[:job]==job else (! j[:ds].nil?) and j[:ds].name==ds.name and j[:job]==job end end end |
#in_loop ⇒ Object
Run one loop step. Returns a Boolean indicating if the loop should continue.
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/miga/daemon.rb', line 225 def in_loop if loop_i == -1 say "-----------------------------------" say "MiGA:#{project.name} launched." say "-----------------------------------" @loop_i = 0 end @loop_i += 1 declare_alive project.load check_datasets check_project flush! if loop_i==4 say "Housekeeping for sanity" @loop_i = 0 purge! end sleep(latency) if shutdown_when_done? and jobs_running.size+jobs_to_run.size == 0 return false end true end |
#last_alive ⇒ Object
When was the last time a daemon for the current project was seen active? Returns DateTime.
48 49 50 |
# File 'lib/miga/daemon.rb', line 48 def last_alive MiGA::Daemon.last_alive project end |
#latency ⇒ Object
Returns Integer indicating the number of seconds to sleep between checks.
76 |
# File 'lib/miga/daemon.rb', line 76 def latency() runopts(:latency) ; end |
#maxjobs ⇒ Object
Returns Integer indicating the maximum number of concurrent jobs to run.
80 |
# File 'lib/miga/daemon.rb', line 80 def maxjobs() runopts(:maxjobs) ; end |
#ppn ⇒ Object
Returns Integer indicating the number of CPUs per job.
84 |
# File 'lib/miga/daemon.rb', line 84 def ppn() runopts(:ppn) ; end |
#purge! ⇒ Object
Remove dead jobs.
217 218 219 220 221 |
# File 'lib/miga/daemon.rb', line 217 def purge! @jobs_running.select! do |job| `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1 end end |
#queue_job(job, ds = nil) ⇒ Object
Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash) see #flush!.
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/miga/daemon.rb', line 157 def queue_job(job, ds=nil) return nil unless get_job(job, ds).nil? ds_name = (ds.nil? ? "miga-project" : ds.name) say "Queueing ", ds_name, ":#{job}" vars = { "PROJECT"=>project.path, "RUNTYPE"=>runopts(:type), "CORES"=>ppn, "MIGA"=>MiGA::MiGA.root_path } vars["DATASET"] = ds.name unless ds.nil? log_dir = File.("daemon/#{job}", project.path) Dir.mkdir(log_dir) unless Dir.exist? log_dir task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}" to_run = {ds: ds, job: job, task_name: task_name, cmd: sprintf(runopts(:cmd), # 1: script File.("scripts/#{job}.bash", vars["MIGA"]), # 2: vars vars.keys.map { |k| sprintf(runopts(:var), k, vars[k]) }.join(runopts(:varsep)), # 3: CPUs ppn, # 4: log file File.("#{ds_name}.log", log_dir), # 5: task name task_name)} @jobs_to_run << to_run end |
#restart(opts = []) ⇒ Object
Restarts the daemon with opts.
101 |
# File 'lib/miga/daemon.rb', line 101 def restart(opts=[]) daemon("restart", opts) ; end |
#runopts(k, v = nil, force = false) ⇒ Object
Set/get #options, where k is the Symbol of the option and v is the value (or nil to use as getter). Skips consistency tests if force. Returns new value.
63 64 65 66 67 68 69 70 71 72 |
# File 'lib/miga/daemon.rb', line 63 def runopts(k, v=nil, force=false) k = k.to_sym unless v.nil? v = [:latency, :maxjobs, :ppn].include?(k) ? v.to_i : [:shutdown_when_done].include?(k) ? !!v : v raise "Daemon's #{k} cannot be set to zero." if !force and v==0 @runopts[k] = v end @runopts[k] end |
#say(*opts) ⇒ Object
Send a datestamped message to the log.
252 253 254 |
# File 'lib/miga/daemon.rb', line 252 def say(*opts) print "[#{Time.new.inspect}] ", *opts, "\n" end |
#shutdown_when_done? ⇒ Boolean
Returns Boolean indicating if the daemon should shutdown when processing is complete.
89 |
# File 'lib/miga/daemon.rb', line 89 def shutdown_when_done?() !!runopts(:shutdown_when_done) ; end |
#start(opts = []) ⇒ Object
Initializes the daemon with opts.
93 |
# File 'lib/miga/daemon.rb', line 93 def start(opts=[]) daemon("start", opts) ; end |
#status(opts = []) ⇒ Object
Returns the status of the daemon with opts.
105 |
# File 'lib/miga/daemon.rb', line 105 def status(opts=[]) daemon("status", opts) ; end |
#stop(opts = []) ⇒ Object
Stops the daemon with opts.
97 |
# File 'lib/miga/daemon.rb', line 97 def stop(opts=[]) daemon("stop", opts) ; end |