Class: OodJob::Adapters::Torque

Inherits:
OodJob::Adapter show all
Defined in:
lib/ood_job/adapters/torque.rb

Overview

An adapter object that describes the communication with a Torque resource manager for job management.

Constant Summary collapse

STATE_MAP =

Mapping of state characters for PBS

{
  'Q' => :queued,
  'H' => :queued_held,
  'R' => :running,
  'S' => :suspended,
  'E' => :running         # exiting, but still running
}

Instance Attribute Summary

Attributes inherited from OodJob::Adapter

#cluster

Instance Method Summary collapse

Methods inherited from OodJob::Adapter

#initialize

Constructor Details

This class inherits a constructor from OodJob::Adapter

Instance Method Details

#delete(id:) ⇒ void

This method returns an undefined value.

Delete the submitted job

Parameters:

  • id (#to_s)

    the id of the job

Raises:

  • (Error)

    if something goes wrong deleting a job

See Also:


167
168
169
170
171
172
173
# File 'lib/ood_job/adapters/torque.rb', line 167

def delete(id:)
  pbs.delete_job(id.to_s)
rescue PBS::UnkjobidError
  nil
rescue PBS::Error => e
  raise Error, e.message
end

#hold(id:) ⇒ void

This method returns an undefined value.

Put the submitted job on hold

Parameters:

  • id (#to_s)

    the id of the job

Raises:

  • (Error)

    if something goes wrong holding a job

See Also:


141
142
143
144
145
146
147
# File 'lib/ood_job/adapters/torque.rb', line 141

def hold(id:)
  pbs.hold_job(id.to_s)
rescue PBS::UnkjobidError
  nil
rescue PBS::Error => e
  raise Error, e.message
end

#info(id: '') ⇒ Info+

Retrieve job info from the resource manager

Parameters:

  • id (#to_s) (defaults to: '')

    the id of the job, otherwise get list of all jobs running on cluster

Returns:

  • (Info, Array<Info>)

    information describing submitted job

Raises:

  • (Error)

    if something goes wrong getting job info

See Also:


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/ood_job/adapters/torque.rb', line 91

def info(id: '')
  id = id.to_s
  info_ary = pbs.get_jobs(id: id).map do |k, v|
    /^(?<job_owner>[\w-]+)@/ =~ v[:Job_Owner]
    allocated_nodes = parse_nodes(v[:exec_host] || "")
    Info.new(
      id: k,
      status: STATE_MAP.fetch(v[:job_state], :undetermined),
      allocated_nodes: allocated_nodes,
      submit_host: v[:submit_host],
      job_owner: job_owner,
      procs: allocated_nodes.inject(0) { |sum, x| sum + x[:procs] },
      queue_name: v[:queue],
      wallclock_time: duration_in_seconds(v.fetch(:resources_used, {})[:walltime]),
      cpu_time: duration_in_seconds(v.fetch(:resources_used, {})[:cput]),
      submission_time: v[:ctime],
      dispatch_time: v[:start_time],
      native: v
    )
  end
  info_ary.size == 1 ? info_ary.first : info_ary
rescue PBS::UnkjobidError
  Info.new(
    id: id,
    status: :undetermined
  )
rescue PBS::Error => e
  raise Error, e.message
end

#release(id:) ⇒ void

This method returns an undefined value.

Release the job that is on hold

Parameters:

  • id (#to_s)

    the id of the job

Raises:

  • (Error)

    if something goes wrong releasing a job

See Also:


154
155
156
157
158
159
160
# File 'lib/ood_job/adapters/torque.rb', line 154

def release(id:)
  pbs.release_job(id.to_s)
rescue PBS::UnkjobidError
  nil
rescue PBS::Error => e
  raise Error, e.message
end

#status(id:) ⇒ Status

Retrieve job status from resource manager

Parameters:

  • id (#to_s)

    the id of the job

Returns:

Raises:

  • (Error)

    if something goes wrong getting job status

See Also:


126
127
128
129
130
131
132
133
134
# File 'lib/ood_job/adapters/torque.rb', line 126

def status(id:)
  id = id.to_s
  char = pbs.get_job(id, filters: [:job_state])[id][:job_state]
  Status.new(state: STATE_MAP.fetch(char, :undetermined))
rescue PBS::UnkjobidError
  Status.new(state: :undetermined)
rescue PBS::Error => e
  raise Error, e.message
end

#submit(script:, after: [], afterok: [], afternotok: [], afterany: []) ⇒ String

Submit a job with the attributes defined in the job template instance

Parameters:

  • script (Script)

    script object that describes the script and attributes for the submitted job

  • after (#to_s, Array<#to_s>) (defaults to: [])

    this job may be scheduled for execution at any point after dependent jobs have started execution

  • afterok (#to_s, Array<#to_s>) (defaults to: [])

    this job may be scheduled for execution only after dependent jobs have terminated with no errors

  • afternotok (#to_s, Array<#to_s>) (defaults to: [])

    this job may be scheduled for execution only after dependent jobs have terminated with errors

  • afterany (#to_s, Array<#to_s>) (defaults to: [])

    this job may be scheduled for execution after dependent jobs have terminated

Returns:

  • (String)

    the job id returned after successfully submitting a job

Raises:

  • (Error)

    if something goes wrong submitting a job

See Also:


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/ood_job/adapters/torque.rb', line 25

def submit(script:, after: [], afterok: [], afternotok: [], afterany: [])
  after      = Array.wrap(after).map(&:to_s)
  afterok    = Array.wrap(afterok).map(&:to_s)
  afternotok = Array.wrap(afternotok).map(&:to_s)
  afterany   = Array.wrap(afterany).map(&:to_s)

  # Set headers
  headers = {}
  headers.merge!(job_arguments: script.args.join(' ')) unless script.args.nil?
  headers.merge!(Hold_Types: :u) if script.submit_as_hold
  headers.merge!(Rerunable: script.rerunnable ? 'y' : 'n') unless script.rerunnable.nil?
  headers.merge!(init_work_dir: script.workdir) unless script.workdir.nil?
  headers.merge!(Mail_Users: script.email.join(',')) unless script.email.nil?
  mail_points  = ''
  mail_points += 'b' if script.email_on_started
  mail_points += 'e' if script.email_on_terminated
  headers.merge!(Mail_Points: mail_points) unless mail_points.empty?
  headers.merge!(Job_Name: script.job_name) unless script.job_name.nil?  # ignore input_path (not defined in Torque)

  headers.merge!(Output_Path: script.output_path) unless script.output_path.nil?
  headers.merge!(Error_Path: script.error_path) unless script.error_path.nil?
  headers.merge!(Join_Path: 'oe') if script.join_files
  headers.merge!(reservation_id: script.reservation_id) unless script.reservation_id.nil?
  headers.merge!(Priority: script.priority) unless script.priority.nil?
  headers.merge!(Execution_Time: script.start_time.localtime.strftime("%C%y%m%d%H%M.%S")) unless script.start_time.nil?
  headers.merge!(Account_Name: script.accounting_id) unless script.accounting_id.nil?

  # Set dependencies
  depend = []
  depend << "after:#{after.join(':')}"           unless after.empty?
  depend << "afterok:#{afterok.join(':')}"       unless afterok.empty?
  depend << "afternotok:#{afternotok.join(':')}" unless afternotok.empty?
  depend << "afterany:#{afterany.join(':')}"     unless afterany.empty?
  headers.merge!(depend: depend.join(','))       unless depend.empty?

  # Set resources
  resources = {}
  resources.merge!(mem: "#{script.min_phys_memory}KB") unless script.min_phys_memory.nil?
  resources.merge!(walltime: seconds_to_duration(script.wall_time)) unless script.wall_time.nil?
  resources.merge!(procs: script.min_procs) unless script.min_procs.nil?
  if script.nodes && !script.nodes.empty?
    nodes = uniq_array(script.nodes)
    resources.merge!(nodes: nodes.map {|k, v| k.is_a?(NodeRequest) ? node_request_to_str(k, v) : k }.join('+'))
  end

  # Set environment variables
  envvars = script.job_environment || {}

  # Set native options
  if script.native
    headers.merge!   script.native.fetch(:headers, {})
    resources.merge! script.native.fetch(:resources, {})
    envvars.merge!   script.native.fetch(:envvars, {})
  end

  # Submit job
  pbs.submit_string(script.content, queue: script.queue_name, headers: headers, resources: resources, envvars: envvars)
rescue PBS::Error => e
  raise Error, e.message
end