Class: Bio::Pipengine::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/bio/pipengine/job.rb

Constant Summary collapse

@@logger =
Logger.new(STDOUT)
@@logger_error =
Logger.new(STDERR)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name) ⇒ Job

Returns a new instance of Job.



15
16
17
18
19
20
21
22
23
24
# File 'lib/bio/pipengine/job.rb', line 15

def initialize(name)
  @name = generate_uuid + "-" + name
  @shortname = name
  @command_line = []
  @resources = {}
  @cpus = 1
  @nodes = "1"
  @log = "stdin"
  @log_adapter = nil
end

Instance Attribute Details

#command_lineObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def command_line
  @command_line
end

#cpusObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def cpus
  @cpus
end

#custom_nameObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def custom_name
  @custom_name
end

#custom_outputObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def custom_output
  @custom_output
end

#localObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def local
  @local
end

#logObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def log
  @log
end

#log_adapterObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def log_adapter
  @log_adapter
end

#memObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def mem
  @mem
end

#multi_samplesObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def multi_samples
  @multi_samples
end

#nameObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def name
  @name
end

#nodesObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def nodes
  @nodes
end

#resourcesObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def resources
  @resources
end

#samples_objObject

a Job object holds information on a job to be submitted samples_groups and samples_obj are used to store information in case of steps that require to combine info from multiple samples



12
13
14
# File 'lib/bio/pipengine/job.rb', line 12

def samples_obj
  @samples_obj
end

Instance Method Details

#add_resources(resources) ⇒ Object



26
27
28
# File 'lib/bio/pipengine/job.rb', line 26

def add_resources(resources)
  self.resources.merge! resources
end

#add_step(step, sample) ⇒ Object

add all the command lines for a given step



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/bio/pipengine/job.rb', line 35

def add_step(step,sample) 

  # setting job working directory
  working_dir = "" 
  if self.local 
    working_dir = self.local+"/"+self.name
  else
    working_dir = self.output

    if step.is_multi? 
      folder = (self.custom_output) ? self.custom_output : @shortname 
      working_dir += "/#{folder}"
    else
      folder =
      if self.custom_output 
        self.custom_output
      elsif self.custom_name
        self.custom_name
      else
        step.name
      end
      working_dir += "/#{sample.name}/#{folder}"
    end

  end

  # set job cpus number to the higher step cpus (this in case of multiple steps)
  self.cpus = step.cpus if step.cpus > self.cpus
  
  # set number of nodes for job
  self.nodes = (step.nodes) ? step.nodes : @nodes

  # set the memory used
  self.mem = step.mem

  # adding job working directory
  unless step.name.start_with? "_"
    self.command_line << "if [ ! -f #{working_dir}/checkpoint ]"
    self.command_line << "then"
    self.command_line << logger(step, "start")
    self.command_line << "\nmkdir -p #{working_dir}"
    self.command_line << "cd #{working_dir}"
  end

  # generate command lines for this step
  if step.run.kind_of? Array
    step.run.each_with_index do |cmd, i|
      command = generate_cmd_line(cmd,sample,step)
      # TODO verify that logger works in this case
      # self.command_line << "#{command} || { echo \"FAILED `date`: #{step.name}:#{i}\" ; exit 1; }"
      self.command_line << "#{command} || { #{logger(step, "FAILED #{i}" )}; exit 1; }"
    end
  else
    command = generate_cmd_line(step.run,sample,step)
    # TODO verify that logger works in this case
    # self.command_line << "#{command} || { echo \"FAILED `date`: #{step.name} \" ; exit 1; }"
    self.command_line << "#{command} || { #{logger(step, "FAILED" )}; exit 1; }"
  end
  self.command_line << logger(step, "finished")
             self.command_line << "touch #{working_dir}/checkpoint"
  self.command_line << "else"
  self.command_line << logger(step, "already executed, skipping this step")
  self.command_line << "fi"

  # check if a temporary (i.e. different from 'output') directory is set
  if self.local
    final_output = ""

    if step.is_multi? 
      folder = (self.custom_output) ? self.custom_output : @shortname 
      final_output = self.output+"/#{folder}"   
    else
      folder = (self.custom_output) ? self.custom_output : step.name
      final_output = self.output+"/#{sample.name}/#{folder}"
    end

    self.command_line << "mkdir -p #{final_output}"
    self.command_line << "cp -r #{working_dir}/* #{final_output}"
    self.command_line << "rm -fr #{working_dir}"
  end

end

#outputObject



30
31
32
# File 'lib/bio/pipengine/job.rb', line 30

def output
  self.resources["output"]
end

#submitObject



136
137
138
139
# File 'lib/bio/pipengine/job.rb', line 136

def submit
  job_id = `qsub #{self.output}/#{self.name}.pbs`
  @@logger.info "#{job_id}".green
end

#to_script(options) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/bio/pipengine/job.rb', line 118

def to_script(options)
  File.open(self.output+"/"+self.name+'.pbs','w') do |file|
    file.puts "#!/usr/bin/env bash"
    file.puts "#PBS -N #{self.name}"
    file.puts "#PBS -d #{self.output}"
    file.puts "#PBS -q #{options[:pbs_queue]}" if options[:pbs_queue]
    if options[:pbs_opts]
      file.puts "#PBS -l #{options[:pbs_opts].join(",")}"
    else
      l_string = []
      l_string << "nodes=#{self.nodes}:ppn=#{self.cpus}"
      l_string << "mem=#{self.mem}" if self.mem
      file.puts "#PBS -l #{l_string.join(",")}"
    end
    file.puts self.command_line.join("\n")
  end
end