Class: QueueManager
- Inherits:
-
Object
- Object
- QueueManager
- Defined in:
- lib/autoflow/queue_manager.rb
Direct Known Subclasses
Class Method Summary collapse
- .available? ⇒ Boolean
-
.descendants ⇒ Object
SELECT AND PREPARE MANAGER.
- .priority ⇒ Object
- .select_manager(options) ⇒ Object
- .select_queue_manager(exec_folder, options, jobs, persist_variables) ⇒ Object
- .system_call(cmd, path = nil, remote = FALSE, ssh = nil) ⇒ Object
Instance Method Summary collapse
- #asign_queue_id(ar_jobs, id) ⇒ Object
-
#close_file(file_name, permissions = nil) ⇒ Object
SSH.
- #create_file(file_name, path) ⇒ Object
- #create_folder(folder_name) ⇒ Object
-
#exec ⇒ Object
EXECUTING WORKFLOW WITH MANAGER.
- #get_all_deps(ar_dependencies) ⇒ Object
- #get_dependencies(job, id = nil) ⇒ Object
- #get_queue_system_dependencies(ar_dependencies) ⇒ Object
- #get_queue_system_id(shell_output) ⇒ Object
- #get_relations_and_folders ⇒ Object
-
#init_log ⇒ Object
TODO adapt to remote execution.
-
#initialize(exec_folder, options, commands, persist_variables) ⇒ QueueManager
constructor
A new instance of QueueManager.
- #launch2queue_system(job, id, buffered_jobs) ⇒ Object
- #launch_all_jobs ⇒ Object
- #launch_job_in_folder(job, id, buffered_jobs) ⇒ Object
- #make_environment_file ⇒ Object
- #read_file(file_path) ⇒ Object
- #rm_done_dependencies(job) ⇒ Object
-
#sort_jobs_by_dependencies ⇒ Object
We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary.
- #submit_job(job, ar_dependencies) ⇒ Object
- #system_call(cmd, path = nil) ⇒ Object
- #write_file(file_name, content) ⇒ Object
-
#write_header(id, node, sh) ⇒ Object
QUEUE DEPENDANT METHODS.
- #write_job(job, sh_name) ⇒ Object
Constructor Details
#initialize(exec_folder, options, commands, persist_variables) ⇒ QueueManager
Returns a new instance of QueueManager.
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/autoflow/queue_manager.rb', line 5 def initialize(exec_folder, , commands, persist_variables) @exec_folder = exec_folder @commands = commands @persist_variables = persist_variables @verbose = [:verbose] @show_submit = [:show_submit_command] @job_identifier = [:identifier] @files = {} @remote = [:remote] @ssh = [:ssh] @write_sh = [:write_sh] @external_dependencies = [:external_dependencies] @active_jobs = [] @extended_logging = [:extended_logging] end |
Class Method Details
.available? ⇒ Boolean
322 323 324 |
# File 'lib/autoflow/queue_manager.rb', line 322 def self.available? return FALSE end |
.descendants ⇒ Object
SELECT AND PREPARE MANAGER
25 26 27 |
# File 'lib/autoflow/queue_manager.rb', line 25 def self.descendants ObjectSpace.each_object(Class).select { |klass| klass < self } end |
.priority ⇒ Object
326 327 328 |
# File 'lib/autoflow/queue_manager.rb', line 326 def self.priority return -1 end |
.select_manager(options) ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/autoflow/queue_manager.rb', line 42 def self.select_manager() queue_manager = nil priority = 0 descendants.each do |descendant| if descendant.available?() && priority <= descendant.priority queue_manager = descendant priority = descendant.priority end end return queue_manager end |
.select_queue_manager(exec_folder, options, jobs, persist_variables) ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/autoflow/queue_manager.rb', line 29 def self.select_queue_manager(exec_folder, , jobs, persist_variables) path_managers = File.join(File.dirname(__FILE__),'queue_managers') Dir.glob(path_managers+'/*').each do |manager| require manager end if [:batch] queue_manager = BashManager else queue_manager = select_manager() end return queue_manager.new(exec_folder, , jobs, persist_variables) end |
.system_call(cmd, path = nil, remote = FALSE, ssh = nil) ⇒ Object
259 260 261 262 263 264 265 266 267 |
# File 'lib/autoflow/queue_manager.rb', line 259 def self.system_call(cmd, path = nil, remote = FALSE, ssh = nil) cmd = "cd #{path}; " + cmd if !path.nil? if remote call = ssh.exec!(cmd) else call = %x[#{cmd}] end return call end |
Instance Method Details
#asign_queue_id(ar_jobs, id) ⇒ Object
286 287 288 289 290 |
# File 'lib/autoflow/queue_manager.rb', line 286 def asign_queue_id(ar_jobs, id) ar_jobs.each do |id_job, job| job.queue_id=id end end |
#close_file(file_name, permissions = nil) ⇒ Object
SSH
224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'lib/autoflow/queue_manager.rb', line 224 def close_file(file_name, = nil) #SSH path, content = @files.delete(file_name) file_path = File.join(path, file_name) if @remote @ssh.exec!("echo '#{content}' > #{file_path}") @ssh.exec!("chmod #{} #{file_path}") if !.nil? else local_file = File.open(file_path,'w') local_file.chmod() if !.nil? local_file.print content local_file.close end end |
#create_file(file_name, path) ⇒ Object
216 217 218 |
# File 'lib/autoflow/queue_manager.rb', line 216 def create_file(file_name, path) @files[file_name] = [path, ''] end |
#create_folder(folder_name) ⇒ Object
208 209 210 211 212 213 214 |
# File 'lib/autoflow/queue_manager.rb', line 208 def create_folder(folder_name) if @remote @ssh.exec!("if ! [ -d #{folder_name} ]; then mkdir -p #{folder_name}; fi") else Dir.mkdir(folder_name) if !File.exists?(folder_name) end end |
#exec ⇒ Object
EXECUTING WORKFLOW WITH MANAGER
58 59 60 61 62 63 64 65 66 67 |
# File 'lib/autoflow/queue_manager.rb', line 58 def exec create_folder(@exec_folder) make_environment_file if !@persist_variables.empty? create_file('versions', @exec_folder) write_file('versions',"autoflow\t#{Autoflow::VERSION}") close_file('versions') create_file('index_execution', @exec_folder) launch_all_jobs close_file('index_execution') end |
#get_all_deps(ar_dependencies) ⇒ Object
300 301 302 303 304 305 |
# File 'lib/autoflow/queue_manager.rb', line 300 def get_all_deps(ar_dependencies) final_dep = [] final_dep.concat(get_queue_system_dependencies(ar_dependencies)) if !ar_dependencies.empty? final_dep.concat(@external_dependencies) return final_dep end |
#get_dependencies(job, id = nil) ⇒ Object
279 280 281 282 283 284 |
# File 'lib/autoflow/queue_manager.rb', line 279 def get_dependencies(job, id = nil) ar_dependencies = [] ar_dependencies += job.dependencies ar_dependencies.delete(id) if !id.nil? #Delete autodependency return ar_dependencies end |
#get_queue_system_dependencies(ar_dependencies) ⇒ Object
292 293 294 295 296 297 298 |
# File 'lib/autoflow/queue_manager.rb', line 292 def get_queue_system_dependencies(ar_dependencies) queue_system_ids=[] ar_dependencies.each do |dependency| queue_system_ids << @commands[dependency].queue_id end return queue_system_ids end |
#get_queue_system_id(shell_output) ⇒ Object
318 319 320 |
# File 'lib/autoflow/queue_manager.rb', line 318 def get_queue_system_id(shell_output) end |
#get_relations_and_folders ⇒ Object
89 90 91 92 93 94 95 |
# File 'lib/autoflow/queue_manager.rb', line 89 def get_relations_and_folders relations = {} @commands.each do |name, job| relations[name] = [job.attrib[:exec_folder], job.dependencies] end return relations end |
#init_log ⇒ Object
TODO adapt to remote execution
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/autoflow/queue_manager.rb', line 69 def init_log #TODO adapt to remote execution log_path = [@exec_folder, '.wf_log'].join('/') #Join must assume linux systems so File.join canot be used for windows hosts log = parse_log(log_path) #TODO modify to folder job_relations_with_folders = get_relations_and_folders if @write_sh create_file('wf.json', @exec_folder) write_file('wf.json', job_relations_with_folders.to_json) close_file('wf.json') end @active_jobs.each do |task| query = log[task] if query.nil? log[task] = {'set' => [Time.now.to_i]} else log[task]['set'] << Time.now.to_i end end write_log(log, log_path, job_relations_with_folders) end |
#launch2queue_system(job, id, buffered_jobs) ⇒ Object
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
# File 'lib/autoflow/queue_manager.rb', line 154 def launch2queue_system(job, id, buffered_jobs) sh_name = job.name+'.sh' if @write_sh # Write sh file #-------------------------------- create_file(sh_name, job.attrib[:exec_folder]) write_file(sh_name, '#!/usr/bin/env bash') write_file(sh_name, '##JOB_GROUP_ID='+@job_identifier) write_header(id, job, sh_name) end #Get dependencies #------------------------------------ ar_dependencies = get_dependencies(job, id) buffered_jobs.each do |id_buff_job, buff_job| ar_dependencies += get_dependencies(buff_job, id_buff_job) if @write_sh write_job(buff_job, sh_name) buff_job.attrib[:exec_folder] = job.attrib[:exec_folder] end end ar_dependencies.uniq! if @write_sh #Write sh body #-------------------------------- write_file(sh_name, 'hostname') log_file_path = [@exec_folder, '.wf_log', File.basename(job.attrib[:exec_folder])].join('/') write_file(sh_name, "flow_logger -e #{log_file_path} -s #{job.name}") write_file(sh_name, "source #{File.join(@exec_folder, 'env_file')}") if !@persist_variables.empty? write_job(job, sh_name) write_file(sh_name, "flow_logger -e #{log_file_path} -f #{job.name}") write_file(sh_name, "echo 'General time'") write_file(sh_name, "times") close_file(sh_name, 0755) end #Submit node #----------------------------------- if !@verbose queue_id = submit_job(job, ar_dependencies) job.queue_id = queue_id # Returns id of running tag on queue system asign_queue_id(buffered_jobs, queue_id) end end |
#launch_all_jobs ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/autoflow/queue_manager.rb', line 97 def launch_all_jobs buffered_jobs = [] sorted_jobs = sort_jobs_by_dependencies sorted_jobs.each do |name, job| @active_jobs << job.name if !job.attrib[:done] end init_log sorted_jobs.each do |name, job| write_file('index_execution', "#{name}\t#{job.attrib[:exec_folder]}") if job.attrib[:done] next else rm_done_dependencies(job) end buffered_jobs = launch_job_in_folder(job, name, buffered_jobs) end end |
#launch_job_in_folder(job, id, buffered_jobs) ⇒ Object
142 143 144 145 146 147 148 149 150 151 |
# File 'lib/autoflow/queue_manager.rb', line 142 def launch_job_in_folder(job, id, buffered_jobs) create_folder(job.attrib[:exec_folder]) if !job.attrib[:buffer] # Launch with queue_system the job and all buffered jobs launch2queue_system(job, id, buffered_jobs) buffered_jobs = []#Clean buffer else # Buffer job buffered_jobs << [id, job] end return buffered_jobs end |
#make_environment_file ⇒ Object
200 201 202 203 204 205 206 |
# File 'lib/autoflow/queue_manager.rb', line 200 def make_environment_file create_file('env_file', @exec_folder) @persist_variables.each do |var, value| write_file('env_file', "export #{var}=#{value}") end close_file('env_file') end |
#read_file(file_path) ⇒ Object
238 239 240 241 242 243 244 245 246 247 |
# File 'lib/autoflow/queue_manager.rb', line 238 def read_file(file_path) content = nil if @remote res = @ssh.exec!("[ ! -f #{file_path} ] && echo 'Autoflow:File Not Found' || cat #{file_path}") content = res if !content.include?('Autoflow:File Not Found') else content = File.open(file_path).read if File.exists?(file_path) end return content end |
#rm_done_dependencies(job) ⇒ Object
132 133 134 135 136 137 138 139 140 |
# File 'lib/autoflow/queue_manager.rb', line 132 def rm_done_dependencies(job) remove=[] job.dependencies.each do |dependency| remove << dependency if @commands[dependency].attrib[:done] end remove.each do |rm| job.dependencies.delete(rm) end end |
#sort_jobs_by_dependencies ⇒ Object
We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/autoflow/queue_manager.rb', line 115 def sort_jobs_by_dependencies # We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary ar_jobs = @commands.to_a sorted_jobs = [] jobs_without_dep = ar_jobs.select{|job| job.last.dependencies.empty?} sorted_jobs.concat(jobs_without_dep) while ar_jobs.length != sorted_jobs.length ids = sorted_jobs.map{|job| job.first} ar_jobs.each do |job| if !sorted_jobs.include?(job) deps = job.last.dependencies - ids sorted_jobs << job if deps.empty? end end end return sorted_jobs end |
#submit_job(job, ar_dependencies) ⇒ Object
314 315 316 |
# File 'lib/autoflow/queue_manager.rb', line 314 def submit_job(job, ar_dependencies) end |
#system_call(cmd, path = nil) ⇒ Object
249 250 251 252 253 254 255 256 257 |
# File 'lib/autoflow/queue_manager.rb', line 249 def system_call(cmd, path = nil) cmd = "cd #{path}; " + cmd if !path.nil? if @remote call = @ssh.exec!(cmd) else call = %x[#{cmd}] end return call end |
#write_file(file_name, content) ⇒ Object
220 221 222 |
# File 'lib/autoflow/queue_manager.rb', line 220 def write_file(file_name, content) @files[file_name].last << content+"\n" end |
#write_header(id, node, sh) ⇒ Object
QUEUE DEPENDANT METHODS
310 311 312 |
# File 'lib/autoflow/queue_manager.rb', line 310 def write_header(id, node, sh) end |
#write_job(job, sh_name) ⇒ Object
269 270 271 272 273 274 275 276 277 |
# File 'lib/autoflow/queue_manager.rb', line 269 def write_job(job, sh_name) write_file(sh_name, job.initialization) if !job.initialization.nil? if @extended_logging log_command = '/usr/bin/time -o process_data -v ' else log_command = 'time ' end write_file(sh_name, log_command + job.parameters) end |