Class: MiGA::Daemon

Inherits:
MiGA
  • Object
show all
Includes:
Base
Defined in:
lib/miga/daemon.rb,
lib/miga/daemon/base.rb

Overview

MiGA Daemons handling job submissions.

Defined Under Namespace

Modules: Base

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#latency, #maxjobs, #nodelist, #ppn, #restart, #runopts, #shutdown_when_done?, #start, #status, #stop

Methods inherited from MiGA

CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, initialized?, #result_files_exist?

Methods included from Common::Path

#root_path, #script_path

Methods included from Common::Format

#clean_fasta_file, #seqs_length, #tabulate

Constructor Details

#initialize(project, json = nil) ⇒ Daemon

Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon. If passed, json must be the path to a daemon definition in json format. Otherwise, the project-stored daemon definition is used. In either case, missing variables are used as defined in ~/.miga_daemon.json.



42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/miga/daemon.rb', line 42

def initialize(project, json = nil)
  $_MIGA_DAEMON_LAIR << self
  @project = project
  @runopts = {}
  json ||= File.expand_path('daemon/daemon.json', project.path)
  MiGA::Json.parse(
    json, default: File.expand_path('.miga_daemon.json', ENV['MIGA_HOME'])
  ).each { |k,v| runopts(k, v) }
  update_format_0
  @jobs_to_run = []
  @jobs_running = []
  @loop_i = -1
end

Instance Attribute Details

#jobs_runningObject (readonly)

Array of jobs currently running



32
33
34
# File 'lib/miga/daemon.rb', line 32

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Array of jobs next to be executed



30
31
32
# File 'lib/miga/daemon.rb', line 30

def jobs_to_run
  @jobs_to_run
end

#loop_iObject (readonly)

Integer indicating the current iteration



34
35
36
# File 'lib/miga/daemon.rb', line 34

def loop_i
  @loop_i
end

#optionsObject (readonly)

Options used to setup the daemon



28
29
30
# File 'lib/miga/daemon.rb', line 28

def options
  @options
end

#projectObject (readonly)

MiGA::Project in which the daemon is running



26
27
28
# File 'lib/miga/daemon.rb', line 26

def project
  @project
end

Class Method Details

.last_alive(project) ⇒ Object

When was the last time a daemon for the MiGA::Project project was seen active? Returns Time.



16
17
18
19
20
# File 'lib/miga/daemon.rb', line 16

def self.last_alive(project)
  f = File.expand_path('daemon/alive', project.path)
  return nil unless File.exist? f
  Time.parse(File.read(f))
end

Instance Method Details

#check_datasetsObject

Traverse datasets



132
133
134
135
136
137
# File 'lib/miga/daemon.rb', line 132

def check_datasets
  project.each_dataset do |n, ds|
    to_run = ds.next_preprocessing(false)
    queue_job(:d, ds) unless to_run.nil?
  end
end

#check_projectObject

Check if all reference datasets are pre-processed. If yes, check the project-level tasks



142
143
144
145
146
147
# File 'lib/miga/daemon.rb', line 142

def check_project
  return if project.dataset_names.empty?
  return unless project.done_preprocessing?(false)
  to_run = project.next_task(nil, false)
  queue_job(:p) unless to_run.nil?
end

#daemon(task, opts = [], wait = true) ⇒ Object

Launches the task with options opts (as command-line arguments) and returns the process ID as an Integer. If wait it waits for the process to complete, immediately returns otherwise. Supported tasks: start, stop, restart, status.



75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/miga/daemon.rb', line 75

def daemon(task, opts = [], wait = true)
  MiGA.DEBUG "Daemon.daemon #{task} #{opts}"
  options = default_options
  opts.unshift(task.to_s)
  options[:ARGV] = opts
  # This additional degree of separation below was introduced so the Daemons
  # package doesn't kill the parent process in workflows.
  pid = fork do 
    Daemons.run_proc("MiGA:#{project.name}", options) { while in_loop; end }
  end
  Process.wait(pid) if wait
  pid
end

#declare_aliveObject

Tell the world that you’re alive.



91
92
93
94
95
# File 'lib/miga/daemon.rb', line 91

def declare_alive
  f = File.open(File.expand_path('daemon/alive', project.path), 'w')
  f.print Time.now.to_s
  f.close
end

#default_optionsObject

Returns Hash containing the default options for the daemon.



65
66
67
68
# File 'lib/miga/daemon.rb', line 65

def default_options
  { dir_mode: :normal, dir: File.expand_path('daemon', project.path),
    multiple: false, log_output: true }
end

#flush!Object

Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs or #nodelist (if set).



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/miga/daemon.rb', line 196

def flush!
  # Check for finished jobs
  @jobs_running.select! do |job|
    ongoing = case job[:job].to_s
    when 'd'
      !job[:ds].nil? && !job[:ds].next_preprocessing(false).nil?
    when 'p'
      !project.next_task(nil, false).nil?
    else
      (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false).nil?
    end
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}" unless ongoing
    ongoing
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Launch as many +jobs_to_run+ as possible
  while hostk = next_host
    break if jobs_to_run.empty?
    launch_job(@jobs_to_run.shift, hostk)
  end
end

#get_job(job, ds = nil) ⇒ Object

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.



183
184
185
186
187
188
189
190
191
# File 'lib/miga/daemon.rb', line 183

def get_job(job, ds = nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds.nil?
      j[:ds].nil? and j[:job] == job
    else
      (! j[:ds].nil?) and j[:ds].name == ds.name and j[:job] == job
    end
  end
end

#in_loopObject

Run one loop step. Returns a Boolean indicating if the loop should continue.



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/miga/daemon.rb', line 239

def in_loop
  declare_alive
  project.load
  if loop_i == -1
    say '-----------------------------------'
    say 'MiGA:%s launched' % project.name
    say '-----------------------------------'
    load_status
    say 'Configuration options:'
    say @runopts.to_s
    @loop_i = 0
  end
  @loop_i += 1
  check_datasets
  check_project
  if shutdown_when_done? and jobs_running.size + jobs_to_run.size == 0
    say 'Nothing else to do, shutting down.'
    return false
  end
  flush!
  if loop_i == 12
    say 'Probing running jobs'
    @loop_i = 0
    purge!
  end
  report_status
  sleep(latency)
  true
end

#last_aliveObject

When was the last time a daemon for the current project was seen active? Returns Time.



59
60
61
# File 'lib/miga/daemon.rb', line 59

def last_alive
  MiGA::Daemon.last_alive project
end

#launch_job(job, hostk = nil) ⇒ Object

Launch the job described by Hash job to hostk-th host



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# File 'lib/miga/daemon.rb', line 286

def launch_job(job, hostk = nil)
  # Execute job
  case runopts(:type)
  when 'ssh'
    # Remote job
    job[:hostk] = hostk
    job[:cmd] = job[:cmd].miga_variables(host: nodelist[hostk])
    job[:pid] = spawn job[:cmd]
    Process.detach job[:pid] unless [nil, '', 0].include?(job[:pid])
  when 'bash'
    # Local job
    job[:pid] = spawn job[:cmd]
    Process.detach job[:pid] unless [nil, '', 0].include?(job[:pid])
  else
    # Schedule cluster job (qsub, msub, slurm)
    job[:pid] = `#{job[:cmd]}`.chomp
  end

  # Check if registered
  if [nil, '', 0].include? job[:pid]
    job[:pid] = nil
    @jobs_to_run << job
    say "Unsuccessful #{job[:task_name]}, rescheduling"
  else
    @jobs_running << job
    say "Spawned pid:#{job[:pid]}#{
          " to #{job[:hostk]}:#{nodelist[job[:hostk]]}" if job[:hostk]
        } for #{job[:task_name]}"
  end
end

#load_statusObject

Load the status of a previous instance.



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/miga/daemon.rb', line 107

def load_status
  f_path = File.expand_path('daemon/status.json', project.path)
  return unless File.size? f_path
  say 'Loading previous status in daemon/status.json:'
  status = MiGA::Json.parse(f_path)
  status.keys.each do |i|
    status[i].map! do |j|
      j.tap do |k|
        unless k[:ds].nil? or k[:ds_name] == 'miga-project'
          k[:ds] = project.dataset(k[:ds_name])
        end
        k[:job] = k[:job].to_sym unless k[:job].nil?
      end
    end
  end
  @jobs_running = status[:jobs_running]
  @jobs_to_run  = status[:jobs_to_run]
  say "- jobs left running: #{@jobs_running.size}"
  purge!
  say "- jobs running: #{@jobs_running.size}"
  say "- jobs to run: #{@jobs_to_run.size}"
end

#next_hostObject

In SSH daemons, retrieve the host index of an available node, nil if none. In any other daemons, returns true as long as #maxjobs is not reached



222
223
224
225
226
227
# File 'lib/miga/daemon.rb', line 222

def next_host
  return jobs_running.size < maxjobs if runopts(:type) != 'ssh'
  allk = (0 .. nodelist.size-1).to_a
  busyk = jobs_running.map { |k| k[:hostk] }
  (allk - busyk).first
end

#purge!Object

Remove dead jobs.



231
232
233
234
235
# File 'lib/miga/daemon.rb', line 231

def purge!
  @jobs_running.select! do |job|
    `#{runopts(:alive).miga_variables(pid: job[:pid])}`.chomp.to_i == 1
  end
end

#queue_job(job, ds = nil) ⇒ Object

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash or ssh) see #flush!



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/miga/daemon.rb', line 153

def queue_job(job, ds = nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? 'miga-project' : ds.name)
  say 'Queueing %s:%s' % [ds_name, job]
  vars = {
    'PROJECT' => project.path,
    'RUNTYPE' => runopts(:type),
    'CORES'   => ppn,
    'MIGA'    => MiGA::MiGA.root_path
  }
  vars['DATASET'] = ds.name unless ds.nil?
  log_dir = File.expand_path("daemon/#{job}", project.path)
  Dir.mkdir(log_dir) unless Dir.exist? log_dir
  task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}"
  to_run = { ds: ds, ds_name: ds_name, job: job, task_name: task_name }
  to_run[:cmd] = runopts(:cmd).miga_variables(
    script: MiGA::MiGA.script_path(job, miga:vars['MIGA'], project: project),
    vars: vars.map { |k, v|
      runopts(:var).miga_variables(key: k, value: v) }.join(runopts(:varsep)),
    cpus: ppn,
    log: File.expand_path("#{ds_name}.log", log_dir),
    task_name: task_name,
    miga: File.expand_path('bin/miga', MiGA::MiGA.root_path).shellescape
  )
  @jobs_to_run << to_run
end

#report_statusObject

Report status in a JSON file.



99
100
101
102
103
# File 'lib/miga/daemon.rb', line 99

def report_status
  MiGA::Json.generate(
    {jobs_running: @jobs_running, jobs_to_run: @jobs_to_run},
    File.expand_path('daemon/status.json', project.path))
end

#say(*opts) ⇒ Object

Send a datestamped message to the log.



271
272
273
# File 'lib/miga/daemon.rb', line 271

def say(*opts)
  print "[#{Time.new.inspect}] ", *opts, "\n"
end

#terminateObject

Terminates a daemon.



277
278
279
280
281
282
# File 'lib/miga/daemon.rb', line 277

def terminate
  say 'Terminating daemon...'
  report_status
  f = File.expand_path('daemon/alive', project.path)
  File.unlink(f) if File.exist? f
end

#update_format_0Object

Update from daemon JSON format 0 to the latest version



319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/miga/daemon.rb', line 319

def update_format_0
  {
    cmd: %w[script vars cpus log task_name],
    var: %w[key value],
    alive: %w[pid],
    kill: %w[pid]
  }.each do |k,v|
    runopts(
      k, runopts(k).gsub(/%(\d+\$)?d/, '%\\1s') % v.map{ |i| "{{#{i}}}" }
    ) if !runopts(k).nil? && runopts(k) =~ /%(\d+\$)?[ds]/
  end
  runopts(:format_version, 1)
end