Top Level Namespace
Defined Under Namespace
Modules: CommonFunctions, Kernel Classes: AppControllerClient, Object
Constant Summary collapse
- NO_NODES_NEEDED =
A list of Neptune jobs that do not require nodes to be spawned up for computation
["acl", "input", "output", "compile"]
- NO_OUTPUT_NEEDED =
A list of Neptune jobs that do not require the output to be specified beforehand
["input"]
- ALLOWED_STORAGE_TYPES =
A list of storage mechanisms that we can use to store and retrieve data to for Neptune jobs.
["appdb", "gstorage", "s3", "walrus"]
- NEED_PREPROCESSING =
A list of jobs that require some kind of work to be done before the actual computation can be performed.
["compile", "erlang", "mpi", "ssa"]
- NO_TIMEOUT =
Sometimes SOAP calls take a long time if large amounts of data are being sent over the network: for this first version we don’t want these calls to endlessly timeout and retry, so as a hack, just don’t let them timeout. The next version should replace this and properly timeout and not use long calls unless necessary.
-1
Instance Method Summary collapse
-
#compile_code(job_data, ssh_args, shadow_ip, shell = Kernel.method(:`)) ⇒ Object
This method sends out a request to compile code, waits for it to finish, and gets the standard out and error returned from the compilation.
-
#do_preprocessing(job_data) ⇒ Object
Certain types of jobs need steps to be taken before they can be started (e.g., copying input data or code over).
-
#get_input(job_data, ssh_args, shadow_ip, controller, file = File, shell = Kernel.method(:`)) ⇒ Object
This method takes a file on the local user’s computer and stores it remotely via AppScale.
- #get_job_data(params) ⇒ Object
-
#get_std_out_and_err(location) ⇒ Object
This method returns a hash containing the standard out and standard error from a completed job, as well as a result field that indicates whether or not the job completed successfully (success = no errors).
-
#neptune(params) ⇒ Object
This method is the heart of Neptune - here, we take blocks of code that the user has written and convert them into HPC job requests.
-
#preprocess_compile(job_data, shell = Kernel.method(:`)) ⇒ Object
This preprocessing method copies over the user’s code to the Shadow node so that it can be compiled there.
- #preprocess_erlang(job_data, file = File, common_functions = CommonFunctions) ⇒ Object
-
#preprocess_mpi(job_data) ⇒ Object
This preprocessing method verifies that the user specified the number of nodes to use.
-
#preprocess_ssa(job_data) ⇒ Object
This preprocessing method verifies that the user specified the number of trajectories to run, via either :trajectories or :simulations.
-
#run_job(job_data, ssh_args, shadow_ip, secret, controller = AppControllerClient, file = File) ⇒ Object
This method actually runs the Neptune job, given information about the job as well as information about the node to send the request to.
- #upload_app_for_cicero(job_data) ⇒ Object
- #validate_storage_params(job_data) ⇒ Object
-
#wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location, shell = Kernel.method(:`)) ⇒ Object
This method waits for AppScale to finish compiling the user’s code, indicated by AppScale copying the finished code to a pre-determined location.
Instance Method Details
#compile_code(job_data, ssh_args, shadow_ip, shell = Kernel.method(:`)) ⇒ Object
This method sends out a request to compile code, waits for it to finish, and gets the standard out and error returned from the compilation. This method returns a hash containing the standard out, error, and a result that indicates whether or not the compilation was successful.
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 |
# File 'lib/neptune.rb', line 301 def compile_code(job_data, ssh_args, shadow_ip, shell=Kernel.method(:`)) compiled_location = controller.compile_code(job_data) copy_to = job_data["@copy_to"] wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location) FileUtils.rm_rf(copy_to) scp_command = "scp -r #{ssh_args} root@#{shadow_ip}:#{compiled_location} #{copy_to} 2>&1" puts scp_command shell.call(scp_command) code = job_data["@code"] dirs = code.split(/\//) remote_dir = "/tmp/" + dirs[-1] [remote_dir, compiled_location].each { |remote_files| ssh_command = "ssh #{ssh_args} root@#{shadow_ip} 'rm -rf #{remote_files}' 2>&1" puts ssh_command shell.call(ssh_command) } return get_std_out_and_err(copy_to) end |
#do_preprocessing(job_data) ⇒ Object
Certain types of jobs need steps to be taken before they can be started (e.g., copying input data or code over). This method dispatches the right method to use based on the type of the job that the user has asked to run.
52 53 54 55 56 57 58 59 60 |
# File 'lib/neptune.rb', line 52 def do_preprocessing(job_data) job_type = job_data["@type"] if !NEED_PREPROCESSING.include?(job_type) return end preprocess = "preprocess_#{job_type}".to_sym send(preprocess, job_data) end |
#get_input(job_data, ssh_args, shadow_ip, controller, file = File, shell = Kernel.method(:`)) ⇒ Object
This method takes a file on the local user’s computer and stores it remotely via AppScale. It returns a hash map indicating whether or not the job succeeded and if it failed, the reason for it.
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/neptune.rb', line 247 def get_input(job_data, ssh_args, shadow_ip, controller, file=File, shell=Kernel.method(:`)) result = {:result => :success} if !job_data["@local"] abort("You failed to specify a file to copy over via the :local flag.") end local_file = file.(job_data["@local"]) if !file.exists?(local_file) reason = "the file you specified to copy, #{local_file}, doesn't exist." + " Please specify a file that exists and try again." return {:result => :failure, :reason => reason} end remote = "/tmp/neptune-input-#{rand(100000)}" scp_cmd = "scp -r #{ssh_args} #{local_file} root@#{shadow_ip}:#{remote}" puts scp_cmd shell.call(scp_cmd) job_data["@local"] = remote puts "job data = #{job_data.inspect}" response = controller.put_input(job_data) if response return {:result => :success} else # TODO - expand this to include the reason why it failed return {:result => :failure} end end |
#get_job_data(params) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/neptune.rb', line 162 def get_job_data(params) job_data = {} params.each { |k, v| key = "@#{k}" job_data[key] = v } job_data.delete("@job") job_data["@keyname"] = params[:keyname] || "appscale" job_data["@type"] = job_data["@type"].to_s type = job_data["@type"] if type == "upc" or type == "x10" job_data["@type"] = "mpi" type = "mpi" end # kdt jobs also run as mpi jobs, but need to pass along an executable # parameter to let mpiexec know to use python to exec it if type == "kdt" job_data["@type"] = "mpi" type = "mpi" job_data["@executable"] = "python" end if job_data["@nodes_to_use"].class == Hash job_data["@nodes_to_use"] = job_data["@nodes_to_use"].to_a.flatten end if !NO_OUTPUT_NEEDED.include?(type) if (job_data["@output"].nil? or job_data["@output"] == "") abort("Job output must be specified") end if job_data["@output"][0].chr != "/" abort("Job output must begin with a slash ('/')") end end return job_data end |
#get_std_out_and_err(location) ⇒ Object
This method returns a hash containing the standard out and standard error from a completed job, as well as a result field that indicates whether or not the job completed successfully (success = no errors).
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
# File 'lib/neptune.rb', line 330 def get_std_out_and_err(location) result = {} out = File.open("#{location}/compile_out") { |f| f.read.chomp! } result[:out] = out err = File.open("#{location}/compile_err") { |f| f.read.chomp! } result[:err] = err if result[:err] result[:result] = :failure else result[:result] = :success end return result end |
#neptune(params) ⇒ Object
This method is the heart of Neptune - here, we take blocks of code that the user has written and convert them into HPC job requests. At a high level, the user can request to run a job, retrieve a job’s output, or modify the access policy (ACL) for the output of a job. By default, job data is private, but a Neptune job can be used to set it to public later (and vice-versa).
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 |
# File 'lib/neptune.rb', line 419 def neptune(params) puts "Received a request to run a job." puts params[:type] job_data = get_job_data(params) validate_storage_params(job_data) puts "job data = #{job_data.inspect}" do_preprocessing(job_data) keyname = job_data["@keyname"] shadow_ip = CommonFunctions.get_from_yaml(keyname, :shadow) secret = CommonFunctions.get_secret_key(keyname) ssh_key = File.("~/.appscale/#{keyname}.key") ssh_args = "-i ~/.appscale/#{keyname}.key -o StrictHostkeyChecking=no " return run_job(job_data, ssh_args, shadow_ip, secret) end |
#preprocess_compile(job_data, shell = Kernel.method(:`)) ⇒ Object
This preprocessing method copies over the user’s code to the Shadow node so that it can be compiled there. A future version of this method may also copy over libraries as well.
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/neptune.rb', line 65 def preprocess_compile(job_data, shell=Kernel.method(:`)) code = File.(job_data["@code"]) if !File.exists?(code) abort("The source file #{code} does not exist.") end suffix = code.split('/')[-1] dest = "/tmp/#{suffix}" keyname = job_data["@keyname"] shadow_ip = CommonFunctions.get_from_yaml(keyname, :shadow) ssh_args = "-i ~/.appscale/#{keyname}.key -o StrictHostkeyChecking=no root@#{shadow_ip}" remove_dir = "ssh #{ssh_args} 'rm -rf #{dest}' 2>&1" puts remove_dir shell.call(remove_dir) CommonFunctions.scp_to_shadow(code, dest, keyname, is_dir=true) job_data["@code"] = dest end |
#preprocess_erlang(job_data, file = File, common_functions = CommonFunctions) ⇒ Object
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/neptune.rb', line 86 def preprocess_erlang(job_data, file=File, common_functions=CommonFunctions) if !job_data["@code"] abort("When running Erlang jobs, :code must be specified.") end source_code = file.(job_data["@code"]) if !file.exists?(source_code) abort("The specified code, #{job_data['@code']}," + " didn't exist. Please specify one that exists and try again") end dest_code = "/tmp/" keyname = job_data["@keyname"] common_functions.scp_to_shadow(source_code, dest_code, keyname) end |
#preprocess_mpi(job_data) ⇒ Object
This preprocessing method verifies that the user specified the number of nodes to use. If they also specified the number of processes to use, we also verify that this value is at least as many as the number of nodes (that is, nodes can’t be underprovisioned in MPI).
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/neptune.rb', line 106 def preprocess_mpi(job_data) if !job_data["@nodes_to_use"] abort("When running MPI jobs, :nodes_to_use must be specified.") end if !job_data["@procs_to_use"] abort("When running MPI jobs, :procs_to_use must be specified.") end if job_data["@procs_to_use"] p = job_data["@procs_to_use"] n = job_data["@nodes_to_use"] if p < n abort("When specifying both :procs_to_use and :nodes_to_use" + ", :procs_to_use must be at least as large as :nodes_to_use. Please " + "change this and try again. You specified :procs_to_use = #{p} and" + ":nodes_to_use = #{n}.") end end if job_data["@argv"] argv = job_data["@argv"] if argv.class != String and argv.class != Array abort("The value specified for :argv must be either a String or Array") end if argv.class == Array job_data["@argv"] = argv.join(' ') end end return job_data end |
#preprocess_ssa(job_data) ⇒ Object
This preprocessing method verifies that the user specified the number of trajectories to run, via either :trajectories or :simulations. Both should not be specified - only one or the other, and regardless of which they specify, convert it to be :trajectories.
144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/neptune.rb', line 144 def preprocess_ssa(job_data) if job_data["@simulations"] and job_data["@trajectories"] abort("Both :simulations and :trajectories cannot be specified - use one" + " or the other.") end if job_data["@simulations"] job_data["@trajectories"] = job_data["@simulations"] job_data.delete("@simulations") end if !job_data["@trajectories"] abort(":trajectories needs to be specified when running ssa jobs") end return job_data end |
#run_job(job_data, ssh_args, shadow_ip, secret, controller = AppControllerClient, file = File) ⇒ Object
This method actually runs the Neptune job, given information about the job as well as information about the node to send the request to.
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 |
# File 'lib/neptune.rb', line 376 def run_job(job_data, ssh_args, shadow_ip, secret, controller=AppControllerClient, file=File) controller = controller.new(shadow_ip, secret) # TODO - right now the job is assumed to succeed in many cases # need to investigate the various failure scenarios result = { :result => :success } case job_data["@type"] when "input" result = get_input(job_data, ssh_args, shadow_ip, controller, file) when "output" result[:output] = controller.get_output(job_data) when "get-acl" job_data["@type"] = "acl" result[:acl] = controller.get_acl(job_data) when "set-acl" job_data["@type"] = "acl" result[:acl] = controller.set_acl(job_data) when "compile" result = compile_code(job_data, ssh_args, shadow_ip) when "cicero" upload_app_for_cicero(job_data) msg = controller.start_neptune_job(job_data) result[:msg] = msg result[:result] = :failure if result[:msg] !~ /job is now running\Z/ else msg = controller.start_neptune_job(job_data) result[:msg] = msg result[:result] = :failure if result[:msg] !~ /job is now running\Z/ end return result end |
#upload_app_for_cicero(job_data) ⇒ Object
348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
# File 'lib/neptune.rb', line 348 def upload_app_for_cicero(job_data) if !job_data["@app"] puts "No app specified, not uploading..." return end app_location = File.(job_data["@app"]) if !File.exists?(app_location) abort("The app you specified, #{app_location}, does not exist." + "Please specify one that does and try again.") end keyname = job_data["@keyname"] || "appscale" if job_data["@appscale_tools"] upload_app = File.(job_data["@appscale_tools"]) + File::SEPARATOR + "bin" + File::SEPARATOR + "appscale-upload-app" else upload_app = "appscale-upload-app" end puts "Uploading AppEngine app at #{app_location}" upload_command = "#{upload_app} --file #{app_location} --test --keyname #{keyname}" puts upload_command puts `#{upload_command}` end |
#validate_storage_params(job_data) ⇒ Object
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
# File 'lib/neptune.rb', line 206 def validate_storage_params(job_data) if !job_data["@storage"] job_data["@storage"] = "appdb" end storage = job_data["@storage"] if !ALLOWED_STORAGE_TYPES.include?(storage) abort("Supported storage types are #{ALLOWED_STORAGE_TYPES.join(', ')}" + " - we do not support #{storage}.") end # Our implementation for storing / retrieving via Google Storage # and Walrus uses # the same library as we do for S3 - so just tell it that it's S3 if storage == "gstorage" or storage == "walrus" storage = "s3" job_data["@storage"] = "s3" end if storage == "s3" ["EC2_ACCESS_KEY", "EC2_SECRET_KEY", "S3_URL"].each { |item| if job_data["@#{item}"] puts "Using specified #{item}" else if ENV[item] puts "Using #{item} from environment" job_data["@#{item}"] = ENV[item] else abort("When storing data to S3, #{item} must be specified or be in " + "your environment. Please do so and try again.") end end } end return job_data end |
#wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location, shell = Kernel.method(:`)) ⇒ Object
This method waits for AppScale to finish compiling the user’s code, indicated by AppScale copying the finished code to a pre-determined location.
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/neptune.rb', line 280 def wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location, shell=Kernel.method(:`)) loop { ssh_command = "ssh #{ssh_args} root@#{shadow_ip} 'ls #{compiled_location}' 2>&1" puts ssh_command ssh_result = shell.call(ssh_command) puts "result was [#{ssh_result}]" if ssh_result =~ /No such file or directory/ puts "Still waiting for code to be compiled..." else puts "compilation complete! Copying compiled code to #{copy_to}" return end sleep(5) } end |