Class: MiGA::Daemon

Inherits:
MiGA
  • Object
show all
Includes:
Base
Defined in:
lib/miga/daemon.rb,
lib/miga/daemon/base.rb

Overview

MiGA Daemons handling job submissions.

Defined Under Namespace

Modules: Base

Constant Summary

Constants included from MiGA

CITATION, VERSION, VERSION_DATE, VERSION_NAME

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Base

#latency, #maxjobs, #ppn, #restart, #runopts, #shutdown_when_done?, #start, #status, #stop

Methods inherited from MiGA

CITATION, DEBUG, DEBUG_OFF, DEBUG_ON, DEBUG_TRACE_OFF, DEBUG_TRACE_ON, FULL_VERSION, LONG_VERSION, VERSION, VERSION_DATE, initialized?, #result_files_exist?

Methods included from Common::Path

#root_path, #script_path

Methods included from Common::Format

#clean_fasta_file, #seqs_length, #tabulate

Constructor Details

#initialize(project) ⇒ Daemon

Initialize an unactive daemon for the MiGA::Project project. See #daemon to wake the daemon.



39
40
41
42
43
44
45
46
47
48
# File 'lib/miga/daemon.rb', line 39

def initialize(project)
  $_MIGA_DAEMON_LAIR << self
  @project = project
  @runopts = JSON.parse(
        File.read(File.expand_path('daemon/daemon.json', project.path)),
        symbolize_names: true)
  @jobs_to_run = []
  @jobs_running = []
  @loop_i = -1
end

Instance Attribute Details

#jobs_runningObject (readonly)

Array of jobs currently running.



32
33
34
# File 'lib/miga/daemon.rb', line 32

def jobs_running
  @jobs_running
end

#jobs_to_runObject (readonly)

Array of jobs next to be executed.



30
31
32
# File 'lib/miga/daemon.rb', line 30

def jobs_to_run
  @jobs_to_run
end

#loop_iObject (readonly)

Integer indicating the current iteration.



34
35
36
# File 'lib/miga/daemon.rb', line 34

def loop_i
  @loop_i
end

#optionsObject (readonly)

Options used to setup the daemon.



28
29
30
# File 'lib/miga/daemon.rb', line 28

def options
  @options
end

#projectObject (readonly)

MiGA::Project in which the daemon is running.



26
27
28
# File 'lib/miga/daemon.rb', line 26

def project
  @project
end

Class Method Details

.last_alive(project) ⇒ Object

When was the last time a daemon for the MiGA::Project project was seen active? Returns DateTime.



16
17
18
19
20
# File 'lib/miga/daemon.rb', line 16

def self.last_alive(project)
  f = File.expand_path('daemon/alive', project.path)
  return nil unless File.exist? f
  DateTime.parse(File.read(f))
end

Instance Method Details

#check_datasetsObject

Traverse datasets



111
112
113
114
115
116
117
118
119
120
121
# File 'lib/miga/daemon.rb', line 111

def check_datasets
  project.each_dataset do |n, ds|
    if ds.nil?
      say "Warning: Dataset #{n} listed but not loaded, reloading project."
      project.load
    else
      to_run = ds.next_preprocessing(true)
      queue_job(to_run, ds) unless to_run.nil?
    end
  end
end

#check_projectObject

Check if all reference datasets are pre-processed. If yes, check the project-level tasks



126
127
128
129
130
131
132
# File 'lib/miga/daemon.rb', line 126

def check_project
  return if project.dataset_names.empty?
  return unless project.done_preprocessing?(false)
  to_run = project.next_distances(true)
  to_run = project.next_inclade(true) if to_run.nil?
  queue_job(to_run) unless to_run.nil?
end

#daemon(task, opts = []) ⇒ Object

Launches the task with options opts (as command-line arguments). Supported tasks include: start, stop, restart, status.



67
68
69
70
71
72
73
74
# File 'lib/miga/daemon.rb', line 67

def daemon(task, opts=[])
  options = default_options
  opts.unshift(task)
  options[:ARGV] = opts
  Daemons.run_proc("MiGA:#{project.name}", options) do
    loop { break unless in_loop }
  end
end

#declare_aliveObject

Tell the world that you’re alive.



78
79
80
81
82
# File 'lib/miga/daemon.rb', line 78

def declare_alive
  f = File.open(File.expand_path('daemon/alive', project.path), 'w')
  f.print Time.now.to_s
  f.close
end

#default_optionsObject

Returns Hash containing the default options for the daemon.



59
60
61
62
# File 'lib/miga/daemon.rb', line 59

def default_options
  { dir_mode: :normal, dir: File.expand_path('daemon', project.path),
    multiple: false, log_output: true }
end

#flush!Object

Remove finished jobs from the internal queue and launch as many as possible respecting #maxjobs.



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/miga/daemon.rb', line 184

def flush!
  # Check for finished jobs
  @jobs_running.select! do |job|
    r = (job[:ds].nil? ? project : job[:ds]).add_result(job[:job], false)
    say "Completed pid:#{job[:pid]} for #{job[:task_name]}." unless r.nil?
    r.nil?
  end
  # Avoid single datasets hogging resources
  @jobs_to_run.rotate! rand(jobs_to_run.size)
  # Launch as many +jobs_to_run+ as possible
  while jobs_running.size < maxjobs
    break if jobs_to_run.empty?
    launch_job @jobs_to_run.shift
  end
end

#get_job(job, ds = nil) ⇒ Object

Get the taks with key symbol job in dataset ds. For project-wide tasks let ds be nil.



171
172
173
174
175
176
177
178
179
# File 'lib/miga/daemon.rb', line 171

def get_job(job, ds=nil)
  (jobs_to_run + jobs_running).find do |j|
    if ds==nil
      j[:ds].nil? and j[:job]==job
    else
      (! j[:ds].nil?) and j[:ds].name==ds.name and j[:job]==job
    end
  end
end

#in_loopObject

Run one loop step. Returns a Boolean indicating if the loop should continue.



210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/miga/daemon.rb', line 210

def in_loop
  declare_alive
  project.load
  if loop_i == -1
    say '-----------------------------------'
    say 'MiGA:%s launched.' % project.name
    say '-----------------------------------'
    load_status
    @loop_i = 0
  end
  @loop_i += 1
  check_datasets
  check_project
  flush!
  if loop_i==4
    say 'Housekeeping for sanity'
    @loop_i = 0
    purge!
  end
  report_status
  sleep(latency)
  if shutdown_when_done? and jobs_running.size+jobs_to_run.size == 0
    say 'Nothing else to do, shutting down.'
    return false
  end
  true
end

#last_aliveObject

When was the last time a daemon for the current project was seen active? Returns DateTime.



53
54
55
# File 'lib/miga/daemon.rb', line 53

def last_alive
  MiGA::Daemon.last_alive project
end

#load_statusObject

Load the status of a previous instance.



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/miga/daemon.rb', line 95

def load_status
  f_path = File.expand_path('daemon/status.json', project.path)
  return unless File.size? f_path
  status = JSON.parse(File.read(f_path), symbolize_names: true)
  status.keys.each do |i|
    status[i].map! do |j|
      j.tap { |k| k[:ds] = project.dataset(k[:ds_name]) unless k[:ds].nil? }
    end
  end
  @jobs_to_run = status[:jobs_to_run]
  @jobs_running = status[:jobs_running]
  purge!
end

#purge!Object

Remove dead jobs.



202
203
204
205
206
# File 'lib/miga/daemon.rb', line 202

def purge!
  @jobs_running.select! do |job|
    `#{sprintf(runopts(:alive), job[:pid])}`.chomp.to_i == 1
  end
end

#queue_job(job, ds = nil) ⇒ Object

Add the task to the internal queue with symbol key job. If the task is dataset-specific, ds specifies the dataset. To submit jobs to the scheduler (or to bash) see #flush!.



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/miga/daemon.rb', line 138

def queue_job(job, ds=nil)
  return nil unless get_job(job, ds).nil?
  ds_name = (ds.nil? ? 'miga-project' : ds.name)
  say 'Queueing %s:%s' % [ds_name, job]
  vars = {
    'PROJECT' => project.path,
    'RUNTYPE' => runopts(:type),
    'CORES'   => ppn,
    'MIGA'    => MiGA::MiGA.root_path
  }
  vars['DATASET'] = ds.name unless ds.nil?
  log_dir = File.expand_path("daemon/#{job}", project.path)
  Dir.mkdir(log_dir) unless Dir.exist? log_dir
  task_name = "#{project.[:name][0..9]}:#{job}:#{ds_name}"
  to_run = {ds: ds, ds_name: ds_name, job: job, task_name: task_name,
    cmd: sprintf(runopts(:cmd),
      # 1: script
      MiGA::MiGA.script_path(job, miga:vars['MIGA'], project:project),
      # 2: vars
      vars.keys.map { |k| sprintf(runopts(:var), k, vars[k]) }.
        join(runopts(:varsep)),
      # 3: CPUs
      ppn,
      # 4: log file
      File.expand_path("#{ds_name}.log", log_dir),
      # 5: task name
      task_name)}
  @jobs_to_run << to_run
end

#report_statusObject

Report status in a JSON file.



86
87
88
89
90
91
# File 'lib/miga/daemon.rb', line 86

def report_status
  f = File.open(File.expand_path('daemon/status.json', project.path), 'w')
  f.print JSON.pretty_generate(
    jobs_running: @jobs_running, jobs_to_run: @jobs_to_run)
  f.close
end

#say(*opts) ⇒ Object

Send a datestamped message to the log.



240
241
242
# File 'lib/miga/daemon.rb', line 240

def say(*opts)
  print "[#{Time.new.inspect}] ", *opts, "\n"
end

#terminateObject

Terminates a daemon.



246
247
248
249
250
251
252
253
254
255
256
# File 'lib/miga/daemon.rb', line 246

def terminate
  say 'Terminating daemon...'
  report_status
  k = runopts(:kill)
  @jobs_running.each do |i|
    `#{k % i[:pid]}`
    puts "Terminating pid:#{i[:pid]} for #{i[:task_name]}"
  end
  f = File.expand_path('daemon/alive', project.path)
  File.unlink(f) if File.exist? f
end