Class: OodCore::Job::Adapters::PBSPro
- Inherits:
-
OodCore::Job::Adapter
- Object
- OodCore::Job::Adapter
- OodCore::Job::Adapters::PBSPro
- Defined in:
- lib/ood_core/job/adapters/pbspro.rb
Overview
An adapter object that describes the communication with a PBS Pro resource manager for job management.
Defined Under Namespace
Classes: Batch
Constant Summary collapse
- STATE_MAP =
Mapping of state codes for PBSPro
{ 'Q' => :queued, 'W' => :queued_held, # job is waiting for its submitter-assigned start time to be reached 'H' => :queued_held, 'T' => :queued_held, # job is being moved to a new location 'M' => :completed, # job was moved to another server 'R' => :running, 'S' => :suspended, 'U' => :suspended, # cycle-harvesting job is suspended due to keyboard activity 'E' => :running, # job is exiting after having run 'F' => :completed, # job is finished 'X' => :completed, # subjob has completed execution or has been deleted 'B' => :running # job array has at least one child running }
Instance Attribute Summary collapse
-
#qstat_factor ⇒ Float
readonly
What percentage of jobs a user owns out of all jobs, used to decide whether we filter the owner’s jobs from a ‘qstat` of all jobs or call `qstat` on each of the owner’s individual jobs.
Instance Method Summary collapse
-
#delete(id) ⇒ void
Delete the submitted job.
- #directive_prefix ⇒ Object
-
#hold(id) ⇒ void
Put the submitted job on hold.
-
#info(id) ⇒ Info
Retrieve job info from the resource manager.
-
#info_all(attrs: nil) ⇒ Array<Info>
Retrieve info for all jobs from the resource manager.
-
#info_where_owner(owner, attrs: nil) ⇒ Array<Info>
Retrieve info for all jobs for a given owner or owners from the resource manager.
-
#initialize(opts = {}) ⇒ PBSPro
constructor
private
A new instance of PBSPro.
-
#release(id) ⇒ void
Release the job that is on hold.
-
#status(id) ⇒ Status
Retrieve job status from resource manager.
-
#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String
Submit a job with the attributes defined in the job template instance.
Methods inherited from OodCore::Job::Adapter
#accounts, #cluster_info, #info_all_each, #info_where_owner_each, #job_name_illegal_chars, #nodes, #queues, #sanitize_job_name, #supports_job_arrays?
Constructor Details
#initialize(opts = {}) ⇒ PBSPro
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a new instance of PBSPro.
219 220 221 222 223 224 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 219 def initialize(opts = {}) o = opts.to_h.compact.symbolize_keys @pbspro = o.fetch(:pbspro) { raise ArgumentError, "No pbspro object specified. Missing argument: pbspro" } @qstat_factor = o.fetch(:qstat_factor, 0.10).to_f end |
Instance Attribute Details
#qstat_factor ⇒ Float (readonly)
What percentage of jobs a user owns out of all jobs, used to decide whether we filter the owner’s jobs from a ‘qstat` of all jobs or call `qstat` on each of the owner’s individual jobs
212 213 214 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 212 def qstat_factor @qstat_factor end |
Instance Method Details
#delete(id) ⇒ void
This method returns an undefined value.
Delete the submitted job
414 415 416 417 418 419 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 414 def delete(id) @pbspro.delete_job(id.to_s) rescue Batch::Error => e # assume successful job deletion if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#directive_prefix ⇒ Object
421 422 423 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 421 def directive_prefix '#PBS' end |
#hold(id) ⇒ void
This method returns an undefined value.
Put the submitted job on hold
390 391 392 393 394 395 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 390 def hold(id) @pbspro.hold_job(id.to_s) rescue Batch::Error => e # assume successful job hold if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#info(id) ⇒ Info
Retrieve job info from the resource manager
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 350 def info(id) id = id.to_s job_infos = @pbspro.get_jobs(id: id).map do |v| parse_job_info(v) end if job_infos.empty? Info.new(id: id, status: :completed) elsif job_infos.length == 1 job_infos.first else process_job_array(id, job_infos) end rescue Batch::Error => e # set completed status if can't find job id if /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. Info.new( id: id, status: :completed ) else raise JobAdapterError, e. end end |
#info_all(attrs: nil) ⇒ Array<Info>
Retrieve info for all jobs from the resource manager
305 306 307 308 309 310 311 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 305 def info_all(attrs: nil) @pbspro.get_jobs.map do |v| parse_job_info(v) end rescue Batch::Error => e raise JobAdapterError, e. end |
#info_where_owner(owner, attrs: nil) ⇒ Array<Info>
Retrieve info for all jobs for a given owner or owners from the resource manager
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 318 def info_where_owner(owner, attrs: nil) owner = Array.wrap(owner).map(&:to_s) usr_jobs = @pbspro.select_jobs(args: ["-u", owner.join(",")]) all_jobs = @pbspro.select_jobs(args: ["-T"]) # `qstat` all jobs if user has too many jobs, otherwise `qstat` each # individual job (default factor is 10%) if usr_jobs.size > (qstat_factor * all_jobs.size) super else begin user_job_infos = [] usr_jobs.each do |id| job = info(id) user_job_infos << job job.tasks.each {|task| user_job_infos << job.build_child_info(task)} end user_job_infos rescue Batch::Error => e raise JobAdapterError, e. end end end |
#release(id) ⇒ void
This method returns an undefined value.
Release the job that is on hold
402 403 404 405 406 407 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 402 def release(id) @pbspro.release_job(id.to_s) rescue Batch::Error => e # assume successful job release if can't find job id raise JobAdapterError, e. unless /Unknown Job Id/ =~ e. || /Job has finished/ =~ e. end |
#status(id) ⇒ Status
Retrieve job status from resource manager
381 382 383 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 381 def status(id) info(id.to_s).status end |
#submit(script, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String
Submit a job with the attributes defined in the job template instance
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'lib/ood_core/job/adapters/pbspro.rb', line 241 def submit(script, after: [], afterok: [], afternotok: [], afterany: []) after = Array(after).map(&:to_s) afterok = Array(afterok).map(&:to_s) afternotok = Array(afternotok).map(&:to_s) afterany = Array(afterany).map(&:to_s) # Set qsub options args = [] # ignore args, can't use these if submitting from STDIN args.concat ["-h"] if script.submit_as_hold args.concat ["-r", script.rerunnable ? "y" : "n"] unless script.rerunnable.nil? args.concat ["-M", script.email.join(",")] unless script.email.nil? if script.email_on_started && script.email_on_terminated args.concat ["-m", "be"] elsif script.email_on_started args.concat ["-m", "b"] elsif script.email_on_terminated args.concat ["-m", "e"] end args.concat ["-N", script.job_name] unless script.job_name.nil? args.concat ["-S", script.shell_path] unless script.shell_path.nil? # ignore input_path (not defined in PBS Pro) args.concat ["-o", script.output_path] unless script.output_path.nil? args.concat ["-e", script.error_path] unless script.error_path.nil? # Reservations are actually just queues in PBS Pro args.concat ["-q", script.reservation_id] if !script.reservation_id.nil? && script.queue_name.nil? args.concat ["-q", script.queue_name] unless script.queue_name.nil? args.concat ["-p", script.priority] unless script.priority.nil? args.concat ["-a", script.start_time.localtime.strftime("%C%y%m%d%H%M.%S")] unless script.start_time.nil? args.concat ["-A", script.accounting_id] unless script.accounting_id.nil? args.concat ["-l", "walltime=#{seconds_to_duration(script.wall_time)}"] unless script.wall_time.nil? # Set dependencies depend = [] depend << "after:#{after.join(":")}" unless after.empty? depend << "afterok:#{afterok.join(":")}" unless afterok.empty? depend << "afternotok:#{afternotok.join(":")}" unless afternotok.empty? depend << "afterany:#{afterany.join(":")}" unless afterany.empty? args.concat ["-W", "depend=#{depend.join(",")}"] unless depend.empty? # Set environment variables envvars = script.job_environment.to_h args.concat ["-v", envvars.map{|k,v| "#{k}=#{v}"}.join(",")] unless envvars.empty? args.concat ["-V"] if script.copy_environment? # If error_path is not specified we join stdout & stderr (as this # mimics what the other resource managers do) args.concat ["-j", "oe"] if script.error_path.nil? args.concat ["-J", script.job_array_request] unless script.job_array_request.nil? # Set native options args.concat script.native if script.native # Submit job @pbspro.submit_string(script.content, args: args, chdir: script.workdir) rescue Batch::Error => e raise JobAdapterError, e. end |