Class: MiGA::Daemon
- Extended by:
- Common::WithDaemonClass
- Includes:
- Common::WithDaemon, Base
- Defined in:
- lib/miga/daemon.rb,
lib/miga/daemon/base.rb
Overview
MiGA Daemons handling job submissions.
Defined Under Namespace
Modules: Base
Constant Summary
Constants included from MiGA
CITATION, VERSION, VERSION_DATE, VERSION_NAME
Instance Attribute Summary collapse
-
#jobs_running ⇒ Object
readonly
Array of jobs currently running.
-
#jobs_to_run ⇒ Object
readonly
Array of jobs next to be executed.
-
#options ⇒ Object
readonly
Options used to setup the daemon.
-
#project ⇒ Object
readonly
MiGA::Project in which the daemon is running.
Attributes included from Common::WithDaemon
Class Method Summary collapse
-
.daemon_home(project) ⇒ Object
Daemon’s home inside the MiGA::Project
project
or a String with the full path to the project’s ‘daemon’ folder.
Instance Method Summary collapse
-
#check_datasets ⇒ Object
Traverse datasets, and returns boolean indicating if at any reference datasets are incomplete.
-
#check_project ⇒ Object
Check if all reference datasets are pre-processed.
-
#daemon_first_loop ⇒ Object
Run only in the first loop.
-
#daemon_home ⇒ Object
Path to the daemon home.
-
#daemon_loop ⇒ Object
Run one loop step.
-
#daemon_name ⇒ Object
Name of the daemon.
-
#exit_cleanup ⇒ Object
Remove temporary files on completion.
-
#flush! ⇒ Object
Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs or #nodelist (if set).
-
#get_job(job, ds = nil) ⇒ Object
Get the taks with key symbol
job
in datasetds
. -
#initialize(project, json = nil) ⇒ Daemon
constructor
Initialize an unactive daemon for the MiGA::Project
project
. -
#job_cmd(to_run) ⇒ Object
Construct the command for the given job definition with current daemon settings.
-
#l_say(level, *msg) ⇒ Object
Send
msg
tosay
as long aslevel
is at mostverbosity
. -
#launch_job(job, hostk = nil) ⇒ Object
Launch the job described by Hash
job
tohostk
-th host. -
#load_status ⇒ Object
Load the status of a previous instance.
- #miga_say ⇒ Object
-
#next_host ⇒ Object
In SSH daemons, retrieve the host index of an available node, nil if none.
-
#path ⇒ Object
Alias to
project.path
for compatibility with lairs. -
#purge! ⇒ Object
Remove dead jobs.
-
#queue_job(job, ds = nil) ⇒ Object
Add the task to the internal queue with symbol key
job
. -
#queue_maintenance(force = false) ⇒ Object
Queue maintenance tasks as an analysis job.
-
#reload_project ⇒ Object
Reload the project’s metadata.
-
#save_status ⇒ Object
Report status in a JSON file.
-
#say(*msg) ⇒ Object
Same as
l_say
with level = 1. -
#update_format_0 ⇒ Object
Update from daemon JSON format 0 to the latest version.
Methods included from Common::WithDaemonClass
alive_file, last_alive, terminated_file
Methods included from Common::WithDaemon
#active?, #alive_file, #daemon, #declare_alive, #declare_alive_loop, #default_options, #in_loop, #last_alive, #launch_daemon_proc, #output_file, #pid_file, #process_alive?, #run, #start, #status, #stop, #terminate, #terminate_file, #terminated_file, #termination_file?, #write_alive_file
Methods included from Base
#bypass_maintenance?, #latency, #logfh, #maxjobs, #nodelist, #ppn, #runopts, #runopts_for, #show_log!, #show_log?, #show_summary!, #shutdown_when_done?, #skip_maintenance, #verbosity
Methods inherited from MiGA
CITATION, CITATION_ARRAY, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, #advance, debug?, debug_trace?, initialized?, #like_io?, #num_suffix, rc_path, #result_files_exist?
Methods included from Common::Path
Methods included from Common::Format
#clean_fasta_file, #seqs_length, #tabulate
Methods included from Common::Net
#download_file_ftp, #known_hosts, #remote_connection
Methods included from Common::SystemCall
Constructor Details
#initialize(project, json = nil) ⇒ Daemon
Initialize an unactive daemon for the MiGA::Project project
. See #daemon to wake the daemon. If passed, json
must be the path to a daemon definition in json format. Otherwise, the project-stored daemon definition is used. In either case, missing variables are used as defined in ~/.miga_daemon.json.
44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/miga/daemon.rb', line 44 def initialize(project, json = nil) @project = project @runopts = {} json ||= File.join(project.path, 'daemon/daemon.json') default_json = File.('.miga_daemon.json', ENV['MIGA_HOME']) MiGA::Json.parse( json, default: File.exist?(default_json) ? default_json : nil ).each { |k, v| runopts(k, v) } update_format_0 @jobs_to_run = [] @jobs_running = [] end |
Instance Attribute Details
#jobs_running ⇒ Object (readonly)
Array of jobs currently running
36 37 38 |
# File 'lib/miga/daemon.rb', line 36 def jobs_running @jobs_running end |
#jobs_to_run ⇒ Object (readonly)
Array of jobs next to be executed
33 34 35 |
# File 'lib/miga/daemon.rb', line 33 def jobs_to_run @jobs_to_run end |
#options ⇒ Object (readonly)
Options used to setup the daemon
30 31 32 |
# File 'lib/miga/daemon.rb', line 30 def @options end |
#project ⇒ Object (readonly)
MiGA::Project in which the daemon is running
27 28 29 |
# File 'lib/miga/daemon.rb', line 27 def project @project end |
Class Method Details
Instance Method Details
#check_datasets ⇒ Object
Traverse datasets, and returns boolean indicating if at any reference datasets are incomplete
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
# File 'lib/miga/daemon.rb', line 185 def check_datasets l_say(2, 'Checking datasets') o = false project.each_dataset do |ds| next unless ds.status == :incomplete next if ds.next_preprocessing(false).nil? o = true if ds.ref? queue_job(:d, ds) end unless show_log? n = project.dataset_names.count k = jobs_to_run.size + jobs_running.size k -= 1 unless get_job(:maintenance).nil? advance('Datasets:', n - k, n, false) miga_say if k == 0 end o end |
#check_project ⇒ Object
Check if all reference datasets are pre-processed. If yes, check the project-level tasks
208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/miga/daemon.rb', line 208 def check_project l_say(2, 'Checking project') # Ignore task if the project has no datasets return if project.dataset_names.empty? # Double-check if all datasets are ready return unless project.done_preprocessing? # Queue project-level job to_run = project.next_task(nil, false) queue_job(:p) unless to_run.nil? end |
#daemon_first_loop ⇒ Object
Run only in the first loop
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/miga/daemon.rb', line 77 def daemon_first_loop say '-----------------------------------' say 'MiGA:%s launched' % project.name say '-----------------------------------' miga_say "Saving log to: #{output_file}" unless show_log? say 'Configuration options:' say @runopts.to_s load_status queue_maintenance(true) end |
#daemon_home ⇒ Object
Path to the daemon home
59 60 61 |
# File 'lib/miga/daemon.rb', line 59 def daemon_home self.class.daemon_home(project) end |
#daemon_loop ⇒ Object
Run one loop step. Returns a Boolean indicating if the loop should continue
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/miga/daemon.rb', line 90 def daemon_loop l_say(3, 'Daemon loop start') reload_project check_datasets or check_project if shutdown_when_done? && (jobs_running.size + jobs_to_run.size).zero? say 'Nothing else to do, shutting down' exit_cleanup return false end flush! if (loop_i % 12).zero? purge! queue_maintenance if (loop_i % (12 * (skip_maintenance + 1))).zero? end save_status sleep(latency) l_say(3, 'Daemon loop end') true end |
#daemon_name ⇒ Object
Name of the daemon
65 66 67 |
# File 'lib/miga/daemon.rb', line 65 def daemon_name "MiGA:#{project.name}" end |
#exit_cleanup ⇒ Object
Remove temporary files on completion
121 122 123 |
# File 'lib/miga/daemon.rb', line 121 def exit_cleanup FileUtils.rm_f(File.join(daemon_home, 'status.json')) end |
#flush! ⇒ Object
Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs or #nodelist (if set).
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/miga/daemon.rb', line 282 def flush! # Check for finished jobs l_say(2, 'Checking for finished jobs') @jobs_running.select! do |job| ongoing = case job[:job].to_s when 'd' !job[:ds].nil? && !job[:ds].next_preprocessing(false).nil? when 'p' !project.next_task(nil, false).nil? else (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil? end say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing ongoing end # Avoid single datasets hogging resources @jobs_to_run.rotate! rand(jobs_to_run.size) # Prioritize: Project-wide > MiGA Online queries > Other datasets @jobs_to_run.sort_by! do |job| job[:ds].nil? ? 1 : job[:ds_name] =~ /^qG_/ ? 2 : 3 end # Launch as many +jobs_to_run+ as possible while (hostk = next_host) break if jobs_to_run.empty? launch_job(@jobs_to_run.shift, hostk) end end |
#get_job(job, ds = nil) ⇒ Object
Get the taks with key symbol job
in dataset ds
. For project-wide tasks let ds
be nil.
269 270 271 272 273 274 275 276 277 |
# File 'lib/miga/daemon.rb', line 269 def get_job(job, ds = nil) (jobs_to_run + jobs_running).find do |j| if ds.nil? j[:ds].nil? && j[:job] == job else !j[:ds].nil? && j[:ds].name == ds.name && j[:job] == job end end end |
#job_cmd(to_run) ⇒ Object
Construct the command for the given job definition with current daemon settings
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 |
# File 'lib/miga/daemon.rb', line 239 def job_cmd(to_run) what = to_run[:ds].nil? ? :project : :dataset vars = { 'PROJECT' => project.path, 'RUNTYPE' => runopts_for(:type, what), 'CORES' => ppn(what), 'MIGA' => MiGA::MiGA.root_path } vars['DATASET'] = to_run[:ds].name unless to_run[:ds].nil? log_dir = File.("daemon/#{to_run[:job]}", project.path) FileUtils.mkdir_p(log_dir) var_hsh = { script: MiGA::MiGA.script_path( to_run[:job], miga: vars['MIGA'], project: project ), vars: vars.map do |k, v| runopts(:var).miga_variables(key: k, value: v) end.join(runopts_for(:varsep, what)), cpus: ppn(what), log: File.join(log_dir, "#{to_run[:ds_name]}.log"), task_name: to_run[:task_name], task_name_simple: to_run[:task_name].gsub(/[^A-Za-z0-9_]/, '-'), miga: File.join(MiGA::MiGA.root_path, 'bin/miga').shellescape } runopts_for(:cmd, what).miga_variables(var_hsh) end |
#l_say(level, *msg) ⇒ Object
Send msg
to say
as long as level
is at most verbosity
127 128 129 |
# File 'lib/miga/daemon.rb', line 127 def l_say(level, *msg) say(*msg) if verbosity >= level end |
#launch_job(job, hostk = nil) ⇒ Object
Launch the job described by Hash job
to hostk
-th host
339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 |
# File 'lib/miga/daemon.rb', line 339 def launch_job(job, hostk = nil) # Execute job job[:cmd] = job_cmd(job) MiGA::MiGA.DEBUG "CMD: #{job[:cmd]}" case runopts(:type) when 'ssh' # Remote job job[:hostk] = hostk job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk]) job[:pid] = spawn job[:cmd] MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}" Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid]) when 'bash' # Local job job[:pid] = spawn job[:cmd] MiGA::MiGA.DEBUG "Detaching PID: #{job[:pid]}" Process.detach(job[:pid]) unless [nil, '', 0].include?(job[:pid]) else # Schedule cluster job (qsub, msub, slurm) job[:pid] = MiGA::MiGA.run_cmd(job[:cmd], return: :output).chomp end # Check if registered if [nil, '', 0].include? job[:pid] job[:pid] = nil @jobs_to_run << job say "Unsuccessful #{job[:task_name]}, rescheduling" else @jobs_running << job job_host = " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk] say "Spawned pid:#{job[:pid]}#{job_host} for #{job[:task_name]}" end end |
#load_status ⇒ Object
Load the status of a previous instance.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/miga/daemon.rb', line 158 def load_status f_path = File.join(daemon_home, 'status.json') return unless File.size? f_path say 'Loading previous status in daemon/status.json:' status = MiGA::Json.parse(f_path) status.each_key do |i| status[i].map! do |j| j.tap do |k| unless k[:ds].nil? || k[:ds_name] == 'miga-project' k[:ds] = project.dataset(k[:ds_name]) end k[:job] = k[:job].to_sym unless k[:job].nil? end end end @jobs_running = status[:jobs_running] @jobs_to_run = status[:jobs_to_run] say "- jobs left running: #{@jobs_running.size}" purge! say "- jobs running: #{@jobs_running.size}" say "- jobs to run: #{@jobs_to_run.size}" end |
#miga_say ⇒ Object
131 |
# File 'lib/miga/daemon.rb', line 131 alias miga_say say |
#next_host ⇒ Object
In SSH daemons, retrieve the host index of an available node, nil if none. In any other daemons, returns true as long as #maxjobs is not reached
318 319 320 321 322 323 324 |
# File 'lib/miga/daemon.rb', line 318 def next_host return jobs_running.size < maxjobs if runopts(:type) != 'ssh' allk = (0..nodelist.size - 1).to_a busyk = jobs_running.map { |k| k[:hostk] } (allk - busyk).first end |
#path ⇒ Object
Alias to project.path
for compatibility with lairs
71 72 73 |
# File 'lib/miga/daemon.rb', line 71 def path project.path end |
#purge! ⇒ Object
Remove dead jobs.
328 329 330 331 332 333 334 335 |
# File 'lib/miga/daemon.rb', line 328 def purge! say 'Probing running jobs' @jobs_running.select! do |job| MiGA::MiGA.run_cmd( runopts(:alive).miga_variables(pid: job[:pid]), return: :output ).chomp.to_i == 1 end end |
#queue_job(job, ds = nil) ⇒ Object
Add the task to the internal queue with symbol key job
. If the task is dataset-specific, ds
specifies the dataset. To submit jobs to the scheduler (or to bash or ssh) see #flush!
226 227 228 229 230 231 232 233 234 |
# File 'lib/miga/daemon.rb', line 226 def queue_job(job, ds = nil) return nil unless get_job(job, ds).nil? ds_name = (ds.nil? ? 'miga-project' : ds.name) task_name = "#{project.[:name][0..9]}:#{job}:#{ds_name}" to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name } say 'Queueing %s:%s' % [to_run[:ds_name], to_run[:job]] @jobs_to_run << to_run end |
#queue_maintenance(force = false) ⇒ Object
Queue maintenance tasks as an analysis job
112 113 114 115 116 117 |
# File 'lib/miga/daemon.rb', line 112 def queue_maintenance(force = false) return if bypass_maintenance? || (!force && shutdown_when_done?) say 'Queueing maintenance tasks' queue_job(:maintenance) end |
#reload_project ⇒ Object
Reload the project’s metadata
141 142 143 144 |
# File 'lib/miga/daemon.rb', line 141 def reload_project l_say(2, 'Reloading project') project.load end |
#save_status ⇒ Object
Report status in a JSON file.
148 149 150 151 152 153 154 |
# File 'lib/miga/daemon.rb', line 148 def save_status l_say(2, 'Saving current status') MiGA::Json.generate( { jobs_running: @jobs_running, jobs_to_run: @jobs_to_run }, File.join(daemon_home, 'status.json') ) end |
#say(*msg) ⇒ Object
Same as l_say
with level = 1
135 136 137 |
# File 'lib/miga/daemon.rb', line 135 def say(*msg) super(logfh, *msg) if verbosity >= 1 end |
#update_format_0 ⇒ Object
Update from daemon JSON format 0 to the latest version
375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 |
# File 'lib/miga/daemon.rb', line 375 def update_format_0 { cmd: %w[script vars cpus log task_name], var: %w[key value], alive: %w[pid], kill: %w[pid] }.each do |k, v| if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/ runopts( k, runopts(k).gsub(/%(\d+\$)?d/, '%\\1s') % v.map { |i| "{{#{i}}}" } ) end end runopts(:format_version, 1) end |