Class: MiGA::Daemon
- Includes:
- Base
- Defined in:
- lib/miga/daemon.rb,
lib/miga/daemon/base.rb
Overview
MiGA Daemons handling job submissions.
Defined Under Namespace
Modules: Base
Constant Summary
Constants included from MiGA
CITATION, VERSION, VERSION_DATE, VERSION_NAME
Instance Attribute Summary collapse
-
#jobs_running ⇒ Object
readonly
Array of jobs currently running.
-
#jobs_to_run ⇒ Object
readonly
Array of jobs next to be executed.
-
#loop_i ⇒ Object
readonly
Integer indicating the current iteration.
-
#options ⇒ Object
readonly
Options used to setup the daemon.
-
#project ⇒ Object
readonly
MiGA::Project in which the daemon is running.
Class Method Summary collapse
-
.last_alive(project) ⇒ Object
When was the last time a daemon for the MiGA::Project
projectwas seen active? Returns Time.
Instance Method Summary collapse
-
#check_datasets ⇒ Object
Traverse datasets.
-
#check_project ⇒ Object
Check if all reference datasets are pre-processed.
-
#daemon(task, opts = [], wait = true) ⇒ Object
Launches the
taskwith optionsopts(as command-line arguments) and returns the process ID as an Integer. -
#declare_alive ⇒ Object
Tell the world that you’re alive.
-
#default_options ⇒ Object
Returns Hash containing the default options for the daemon.
-
#flush! ⇒ Object
Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs or #nodelist (if set).
-
#get_job(job, ds = nil) ⇒ Object
Get the taks with key symbol
jobin datasetds. -
#in_loop ⇒ Object
Run one loop step.
-
#initialize(project, json = nil) ⇒ Daemon
constructor
Initialize an unactive daemon for the MiGA::Project
project. -
#last_alive ⇒ Object
When was the last time a daemon for the current project was seen active? Returns Time.
-
#launch_job(job, hostk = nil) ⇒ Object
Launch the job described by Hash
jobtohostk-th host. -
#load_status ⇒ Object
Load the status of a previous instance.
-
#next_host ⇒ Object
In SSH daemons, retrieve the host index of an available node, nil if none.
-
#purge! ⇒ Object
Remove dead jobs.
-
#queue_job(job, ds = nil) ⇒ Object
Add the task to the internal queue with symbol key
job. -
#report_status ⇒ Object
Report status in a JSON file.
-
#say(*opts) ⇒ Object
Send a datestamped message to the log.
-
#terminate ⇒ Object
Terminates a daemon.
-
#update_format_0 ⇒ Object
Update from daemon JSON format 0 to the latest version.
Methods included from Base
#latency, #maxjobs, #nodelist, #ppn, #restart, #runopts, #shutdown_when_done?, #start, #status, #stop
Methods inherited from MiGA
CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, initialized?, #result_files_exist?
Methods included from Common::Path
Methods included from Common::Format
#clean_fasta_file, #seqs_length, #tabulate
Constructor Details
#initialize(project, json = nil) ⇒ Daemon
Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon. If passed, json must be the path to a daemon definition in json format. Otherwise, the project-stored daemon definition is used. In either case, missing variables are used as defined in ~/.miga_daemon.json.
42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/miga/daemon.rb', line 42 def initialize(project, json = nil) $_MIGA_DAEMON_LAIR << self @project = project @runopts = {} json ||= File.('daemon/daemon.json', project.path) MiGA::Json.parse( json, default: File.('.miga_daemon.json', ENV['MIGA_HOME']) ).each { |k,v| runopts(k, v) } update_format_0 @jobs_to_run = [] @jobs_running = [] @loop_i = -1 end |
Instance Attribute Details
#jobs_running ⇒ Object (readonly)
Array of jobs currently running
32 33 34 |
# File 'lib/miga/daemon.rb', line 32 def jobs_running @jobs_running end |
#jobs_to_run ⇒ Object (readonly)
Array of jobs next to be executed
30 31 32 |
# File 'lib/miga/daemon.rb', line 30 def jobs_to_run @jobs_to_run end |
#loop_i ⇒ Object (readonly)
Integer indicating the current iteration
34 35 36 |
# File 'lib/miga/daemon.rb', line 34 def loop_i @loop_i end |
#options ⇒ Object (readonly)
Options used to setup the daemon
28 29 30 |
# File 'lib/miga/daemon.rb', line 28 def end |
#project ⇒ Object (readonly)
MiGA::Project in which the daemon is running
26 27 28 |
# File 'lib/miga/daemon.rb', line 26 def project @project end |
Class Method Details
.last_alive(project) ⇒ Object
When was the last time a daemon for the MiGA::Project project was seen active? Returns Time.
16 17 18 19 20 |
# File 'lib/miga/daemon.rb', line 16 def self.last_alive(project) f = File.('daemon/alive', project.path) return nil unless File.exist? f Time.parse(File.read(f)) end |
Instance Method Details
#check_datasets ⇒ Object
Traverse datasets
132 133 134 135 136 137 |
# File 'lib/miga/daemon.rb', line 132 def check_datasets project.each_dataset do |n, ds| to_run = ds.next_preprocessing(false) queue_job(:d, ds) unless to_run.nil? end end |
#check_project ⇒ Object
Check if all reference datasets are pre-processed. If yes, check the project-level tasks
142 143 144 145 146 147 |
# File 'lib/miga/daemon.rb', line 142 def check_project return if project.dataset_names.empty? return unless project.done_preprocessing?(false) to_run = project.next_task(nil, false) queue_job(:p) unless to_run.nil? end |
#daemon(task, opts = [], wait = true) ⇒ Object
Launches the task with options opts (as command-line arguments) and returns the process ID as an Integer. If wait it waits for the process to complete, immediately returns otherwise. Supported tasks: start, stop, restart, status.
75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/miga/daemon.rb', line 75 def daemon(task, opts = [], wait = true) MiGA.DEBUG "Daemon.daemon #{task} #{opts}" = opts.unshift(task.to_s) [:ARGV] = opts # This additional degree of separation below was introduced so the Daemons # package doesn't kill the parent process in workflows. pid = fork do Daemons.run_proc("MiGA:#{project.name}", ) { while in_loop; end } end Process.wait(pid) if wait pid end |
#declare_alive ⇒ Object
Tell the world that you’re alive.
91 92 93 94 95 |
# File 'lib/miga/daemon.rb', line 91 def declare_alive f = File.open(File.('daemon/alive', project.path), 'w') f.print Time.now.to_s f.close end |
#default_options ⇒ Object
Returns Hash containing the default options for the daemon.
65 66 67 68 |
# File 'lib/miga/daemon.rb', line 65 def { dir_mode: :normal, dir: File.('daemon', project.path), multiple: false, log_output: true } end |
#flush! ⇒ Object
Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs or #nodelist (if set).
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 |
# File 'lib/miga/daemon.rb', line 196 def flush! # Check for finished jobs @jobs_running.select! do |job| ongoing = case job[:job].to_s when 'd' !job[:ds].nil? && !job[:ds].next_preprocessing(false).nil? when 'p' !project.next_task(nil, false).nil? else (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil? end say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing ongoing end # Avoid single datasets hogging resources @jobs_to_run.rotate! rand(jobs_to_run.size) # Launch as many +jobs_to_run+ as possible while hostk = next_host break if jobs_to_run.empty? launch_job(@jobs_to_run.shift, hostk) end end |
#get_job(job, ds = nil) ⇒ Object
Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.
183 184 185 186 187 188 189 190 191 |
# File 'lib/miga/daemon.rb', line 183 def get_job(job, ds = nil) (jobs_to_run + jobs_running).find do |j| if ds.nil? j[:ds].nil? and j[:job] == job else (! j[:ds].nil?) and j[:ds].name == ds.name and j[:job] == job end end end |
#in_loop ⇒ Object
Run one loop step. Returns a Boolean indicating if the loop should continue.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/miga/daemon.rb', line 239 def in_loop declare_alive project.load if loop_i == -1 say '-----------------------------------' say 'MiGA:%s launched' % project.name say '-----------------------------------' load_status say 'Configuration options:' say @runopts.to_s @loop_i = 0 end @loop_i += 1 check_datasets check_project if shutdown_when_done? and jobs_running.size + jobs_to_run.size == 0 say 'Nothing else to do, shutting down.' return false end flush! if loop_i == 12 say 'Probing running jobs' @loop_i = 0 purge! end report_status sleep(latency) true end |
#last_alive ⇒ Object
When was the last time a daemon for the current project was seen active? Returns Time.
59 60 61 |
# File 'lib/miga/daemon.rb', line 59 def last_alive MiGA::Daemon.last_alive project end |
#launch_job(job, hostk = nil) ⇒ Object
Launch the job described by Hash job to hostk-th host
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/miga/daemon.rb', line 286 def launch_job(job, hostk = nil) # Execute job case runopts(:type) when 'ssh' # Remote job job[:hostk] = hostk job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk]) job[:pid] = spawn job[:cmd] Process.detach job[:pid] unless [nil, '', 0].include?(job[:pid]) when 'bash' # Local job job[:pid] = spawn job[:cmd] Process.detach job[:pid] unless [nil, '', 0].include?(job[:pid]) else # Schedule cluster job (qsub, msub, slurm) job[:pid] = `#{job[:cmd]}`.chomp end # Check if registered if [nil, '', 0].include? job[:pid] job[:pid] = nil @jobs_to_run << job say "Unsuccessful #{job[:task_name]}, rescheduling" else @jobs_running << job say "Spawned pid:#{job[:pid]}#{ " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk] } for #{job[:task_name]}" end end |
#load_status ⇒ Object
Load the status of a previous instance.
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/miga/daemon.rb', line 107 def load_status f_path = File.('daemon/status.json', project.path) return unless File.size? f_path say 'Loading previous status in daemon/status.json:' status = MiGA::Json.parse(f_path) status.keys.each do |i| status[i].map! do |j| j.tap do |k| unless k[:ds].nil? or k[:ds_name] == 'miga-project' k[:ds] = project.dataset(k[:ds_name]) end k[:job] = k[:job].to_sym unless k[:job].nil? end end end @jobs_running = status[:jobs_running] @jobs_to_run = status[:jobs_to_run] say "- jobs left running: #{@jobs_running.size}" purge! say "- jobs running: #{@jobs_running.size}" say "- jobs to run: #{@jobs_to_run.size}" end |
#next_host ⇒ Object
In SSH daemons, retrieve the host index of an available node, nil if none. In any other daemons, returns true as long as #maxjobs is not reached
222 223 224 225 226 227 |
# File 'lib/miga/daemon.rb', line 222 def next_host return jobs_running.size < maxjobs if runopts(:type) != 'ssh' allk = (0 .. nodelist.size-1).to_a busyk = jobs_running.map { |k| k[:hostk] } (allk - busyk).first end |
#purge! ⇒ Object
Remove dead jobs.
231 232 233 234 235 |
# File 'lib/miga/daemon.rb', line 231 def purge! @jobs_running.select! do |job| `#{runopts(:alive).miga_variables(pid: job[:pid])}`.chomp.to_i == 1 end end |
#queue_job(job, ds = nil) ⇒ Object
Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash or ssh) see #flush!
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/miga/daemon.rb', line 153 def queue_job(job, ds = nil) return nil unless get_job(job, ds).nil? ds_name = (ds.nil? ? 'miga-project' : ds.name) say 'Queueing %s:%s' % [ds_name, job] vars = { 'PROJECT' => project.path, 'RUNTYPE' => runopts(:type), 'CORES' => ppn, 'MIGA' => MiGA::MiGA.root_path } vars['DATASET'] = ds.name unless ds.nil? log_dir = File.("daemon/#{job}", project.path) Dir.mkdir(log_dir) unless Dir.exist? log_dir task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}" to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name } to_run[:cmd] = runopts(:cmd).miga_variables( script: MiGA::MiGA.script_path(job, miga:vars['MIGA'], project: project), vars: vars.map { |k, v| runopts(:var).miga_variables(key: k, value: v) }.join(runopts(:varsep)), cpus: ppn, log: File.("#{ds_name}.log", log_dir), task_name: task_name, miga: File.('bin/miga', MiGA::MiGA.root_path).shellescape ) @jobs_to_run << to_run end |
#report_status ⇒ Object
Report status in a JSON file.
99 100 101 102 103 |
# File 'lib/miga/daemon.rb', line 99 def report_status MiGA::Json.generate( {jobs_running: @jobs_running, jobs_to_run: @jobs_to_run}, File.('daemon/status.json', project.path)) end |
#say(*opts) ⇒ Object
Send a datestamped message to the log.
271 272 273 |
# File 'lib/miga/daemon.rb', line 271 def say(*opts) print "[#{Time.new.inspect}] ", *opts, "\n" end |
#terminate ⇒ Object
Terminates a daemon.
277 278 279 280 281 282 |
# File 'lib/miga/daemon.rb', line 277 def terminate say 'Terminating daemon...' report_status f = File.('daemon/alive', project.path) File.unlink(f) if File.exist? f end |
#update_format_0 ⇒ Object
Update from daemon JSON format 0 to the latest version
319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/miga/daemon.rb', line 319 def update_format_0 { cmd: %w[script vars cpus log task_name], var: %w[key value], alive: %w[pid], kill: %w[pid] }.each do |k,v| runopts( k, runopts(k).gsub(/%(\d+\$)?d/, '%\\1s') % v.map{ |i| "{{#{i}}}" } ) if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/ end runopts(:format_version, 1) end |