Class: MiGA::Daemon

Inherits:
MiGA
  • Object
show all
Defined in:
lib/miga/daemon.rb

Overview

MiGA Daemons handling job submissions.

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from MiGA

CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, clean_fasta_file, initialized?, #result_files_exist?, root_path, tabulate

Constructor Details

#initialize(project) ⇒ Daemon

Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon.



35
36
37
38
39
40
41
42
43
# File 'lib/miga/daemon.rb', line 35

def initialize(project)
  @project = project
  @runopts = JSON.parse(
    File.read(File.expand_path("daemon/daemon.json", project.path)),
      {:symbolize_names=>true})
  @jobs_to_run = []
  @jobs_running = []
  @loop_i = -1
end

Instance Attribute Details

#jobs_runningObject (readonly)

Array of jobs currently running.



28
29
30
# File 'lib/miga/daemon.rb', line 28

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Array of jobs next to be executed.



26
27
28
# File 'lib/miga/daemon.rb', line 26

def jobs_to_run
  @jobs_to_run
end

#loop_iObject (readonly)

Integer indicating the current iteration.



30
31
32
# File 'lib/miga/daemon.rb', line 30

def loop_i
  @loop_i
end

#optionsObject (readonly)

Options used to setup the daemon.



24
25
26
# File 'lib/miga/daemon.rb', line 24

def options
  @options
end

#projectObject (readonly)

MiGA::Project in which the daemon is running.



22
23
24
# File 'lib/miga/daemon.rb', line 22

def project
  @project
end

Class Method Details

.last_alive(project) ⇒ Object

When was the last time a daemon for the MiGA::Project project was seen active? Returns DateTime.



15
16
17
18
19
# File 'lib/miga/daemon.rb', line 15

def self.last_alive(project)
  f = File.expand_path("daemon/alive", project.path)
  return nil unless File.exist? f
  DateTime.parse(File.read(f))
end

Instance Method Details

#check_datasetsObject

Traverse datasets



129
130
131
132
133
134
135
136
137
138
139
# File 'lib/miga/daemon.rb', line 129

def check_datasets
  project.each_dataset do |n, ds|
    if ds.nil?
      say "Warning: Dataset #{n} listed but not loaded, reloading project."
      project.load
    else
      to_run = ds.next_preprocessing(true)
      queue_job(to_run, ds) unless to_run.nil?
    end
  end
end

#check_projectObject

Check if all reference datasets are pre-processed. If yes, check the project-level tasks



144
145
146
147
148
149
150
151
# File 'lib/miga/daemon.rb', line 144

def check_project
  return if project.dataset_names.empty?
  if project.done_preprocessing?(false)
    to_run = project.next_distances(true)
    to_run = project.next_inclade(true) if to_run.nil?
    queue_job(to_run) unless to_run.nil?
  end
end

#daemon(task, opts = []) ⇒ Object

Launches the task with options opts (as command-line arguments). Supported tasks include: start, stop, restart, status.



110
111
112
113
114
115
116
117
# File 'lib/miga/daemon.rb', line 110

def daemon(task, opts=[])
  options = default_options
  opts.unshift(task)
  options[:ARGV] = opts
  Daemons.run_proc("MiGA:#{project.name}", options) do
    loop { break unless in_loop }
  end
end

#declare_aliveObject

Tell the world that you’re alive



121
122
123
124
125
# File 'lib/miga/daemon.rb', line 121

def declare_alive
  f = File.open(File.expand_path("daemon/alive", project.path), "w")
  f.print Time.now.to_s
  f.close
end

#default_optionsObject

Returns Hash containing the default options for the daemon.



54
55
56
57
# File 'lib/miga/daemon.rb', line 54

def default_options
  { dir_mode: :normal, dir: File.expand_path("daemon", project.path),
    multiple: false, log_output: true }
end

#flush!Object

Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/miga/daemon.rb', line 199

def flush!
  # Check for finished jobs
  @jobs_running.select! do |job|
    r = (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false)
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless r.nil?
    r.nil?
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Launch as many +jobs_to_run+ as possible
  while jobs_running.size < maxjobs
    break if jobs_to_run.empty?
    launch_job @jobs_to_run.shift
  end
end

#get_job(job, ds = nil) ⇒ Object

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.



186
187
188
189
190
191
192
193
194
# File 'lib/miga/daemon.rb', line 186

def get_job(job, ds=nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds==nil
      j[:ds].nil? and j[:job]==job
    else
      (! j[:ds].nil?) and j[:ds].name==ds.name and j[:job]==job
    end
  end
end

#in_loopObject

Run one loop step. Returns a Boolean indicating if the loop should continue.



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/miga/daemon.rb', line 225

def in_loop
  if loop_i == -1
    say "-----------------------------------"
    say "MiGA:#{project.name} launched."
    say "-----------------------------------"
    @loop_i = 0
  end
  @loop_i += 1
  declare_alive
  project.load
  check_datasets
  check_project
  flush!
  if loop_i==4
    say "Housekeeping for sanity"
    @loop_i = 0
    purge!
  end
  sleep(latency)
  if shutdown_when_done? and jobs_running.size+jobs_to_run.size == 0
    return false
  end
  true
end

#last_aliveObject

When was the last time a daemon for the current project was seen active? Returns DateTime.



48
49
50
# File 'lib/miga/daemon.rb', line 48

def last_alive
  MiGA::Daemon.last_alive project
end

#latencyObject

Returns Integer indicating the number of seconds to sleep between checks.



76
# File 'lib/miga/daemon.rb', line 76

def latency() runopts(:latency) ; end

#maxjobsObject

Returns Integer indicating the maximum number of concurrent jobs to run.



80
# File 'lib/miga/daemon.rb', line 80

def maxjobs() runopts(:maxjobs) ; end

#ppnObject

Returns Integer indicating the number of CPUs per job.



84
# File 'lib/miga/daemon.rb', line 84

def ppn() runopts(:ppn) ; end

#purge!Object

Remove dead jobs.



217
218
219
220
221
# File 'lib/miga/daemon.rb', line 217

def purge!
  @jobs_running.select! do |job|
    `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1
  end
end

#queue_job(job, ds = nil) ⇒ Object

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash) see #flush!.



157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/miga/daemon.rb', line 157

def queue_job(job, ds=nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? "miga-project" : ds.name)
  say "Queueing ", ds_name, ":#{job}"
  vars = { "PROJECT"=>project.path, "RUNTYPE"=>runopts(:type),
    "CORES"=>ppn, "MIGA"=>MiGA::MiGA.root_path }
  vars["DATASET"] = ds.name unless ds.nil?
  log_dir = File.expand_path("daemon/#{job}", project.path)
  Dir.mkdir(log_dir) unless Dir.exist? log_dir
  task_name = "#{project.metadata[:name][0..9]}:#{job}:#{ds_name}"
  to_run = {ds: ds, job: job, task_name: task_name,
    cmd: sprintf(runopts(:cmd),
      # 1: script
      File.expand_path("scripts/#{job}.bash", vars["MIGA"]),
      # 2: vars
      vars.keys.map { |k|
 sprintf(runopts(:var), k, vars[k]) }.join(runopts(:varsep)),
      # 3: CPUs
      ppn,
      # 4: log file
      File.expand_path("#{ds_name}.log", log_dir),
      # 5: task name
      task_name)}
  @jobs_to_run << to_run
end

#restart(opts = []) ⇒ Object

Restarts the daemon with opts.



101
# File 'lib/miga/daemon.rb', line 101

def restart(opts=[]) daemon("restart", opts) ; end

#runopts(k, v = nil, force = false) ⇒ Object

Set/get #options, where k is the Symbol of the option and v is the value (or nil to use as getter). Skips consistency tests if force. Returns new value.



63
64
65
66
67
68
69
70
71
72
# File 'lib/miga/daemon.rb', line 63

def runopts(k, v=nil, force=false)
  k = k.to_sym
  unless v.nil?
    v = [:latency, :maxjobs, :ppn].include?(k) ? v.to_i :
        [:shutdown_when_done].include?(k) ? !!v : v
    raise "Daemon's #{k} cannot be set to zero." if !force and v==0
    @runopts[k] = v
  end
  @runopts[k]
end

#say(*opts) ⇒ Object

Send a datestamped message to the log.



252
253
254
# File 'lib/miga/daemon.rb', line 252

def say(*opts)
  print "[#{Time.new.inspect}] ", *opts, "\n"
end

#shutdown_when_done?Boolean

Returns Boolean indicating if the daemon should shutdown when processing is complete.

Returns:

  • (Boolean)


89
# File 'lib/miga/daemon.rb', line 89

def shutdown_when_done?() !!runopts(:shutdown_when_done) ; end

#start(opts = []) ⇒ Object

Initializes the daemon with opts.



93
# File 'lib/miga/daemon.rb', line 93

def start(opts=[]) daemon("start", opts) ; end

#status(opts = []) ⇒ Object

Returns the status of the daemon with opts.



105
# File 'lib/miga/daemon.rb', line 105

def status(opts=[]) daemon("status", opts) ; end

#stop(opts = []) ⇒ Object

Stops the daemon with opts.



97
# File 'lib/miga/daemon.rb', line 97

def stop(opts=[]) daemon("stop", opts) ; end