Module: NeptuneHelper

Defined in:
lib/neptune.rb

Overview

NeptuneHelper provides methods that are used by neptune() and babel() to validate parameters and run the user’s job.

Class Method Summary collapse

Class Method Details

.compile_code(job_data, ssh_args, shadow_ip) ⇒ Object

This method sends out a request to compile code, waits for it to finish, and gets the standard out and error returned from the compilation. This method returns a hash containing the standard out, error, and a result that indicates whether or not the compilation was successful.



459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# File 'lib/neptune.rb', line 459

def self.compile_code(job_data, ssh_args, shadow_ip)
  compiled_location = controller.compile_code(job_data)
  copy_to = job_data["@copy_to"]
  self.wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location)

  FileUtils.rm_rf(copy_to)

  scp_command = "scp -r #{ssh_args} root@#{shadow_ip}:#{compiled_location} #{copy_to} 2>&1"
  # Kernel.puts scp_command
  CommonFunctions.shell(scp_command)

  code = job_data["@code"]
  dirs = code.split(/\//)
  remote_dir = "/tmp/" + dirs[-1] 

  [remote_dir, compiled_location].each { |remote_files|
    ssh_command = "ssh #{ssh_args} root@#{shadow_ip} 'rm -rf #{remote_files}' 2>&1"
    # Kernel.puts ssh_command
    CommonFunctions.shell(ssh_command)
  }

  return get_std_out_and_err(copy_to)
end

.do_preprocessing(job_data, controller) ⇒ Object

Certain types of jobs need steps to be taken before they can be started (e.g., copying input data or code over). This method dispatches the right method to use based on the type of the job that the user has asked to run.



114
115
116
117
118
119
120
121
122
123
124
# File 'lib/neptune.rb', line 114

def self.do_preprocessing(job_data, controller)
  job_type = job_data["@type"]
  if !NEED_PREPROCESSING.include?(job_type)
    return
  end

  # Don't worry about adding on the self. prefix - send will resolve
  # it the right way
  preprocess = "preprocess_#{job_type}".to_sym
  send(preprocess, job_data, controller)
end

.get_input(job_data, ssh_args, shadow_ip, controller) ⇒ Object

This method takes a file on the local user’s computer and stores it remotely via AppScale. It returns a hash map indicating whether or not the job succeeded and if it failed, the reason for it.



407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
# File 'lib/neptune.rb', line 407

def self.get_input(job_data, ssh_args, shadow_ip, controller)
  result = {:result => :success}

  self.require_param("@local", job_data)

  local_file = File.expand_path(job_data["@local"])
  if !File.exists?(local_file)
    reason = "the file you specified to copy, #{local_file}, doesn't exist." + 
        " Please specify a file that exists and try again."
    return {:result => :failure, :reason => reason}  
  end

  remote = "/tmp/neptune-input-#{rand(100000)}"
  scp_cmd = "scp -r #{ssh_args} #{local_file} root@#{shadow_ip}:#{remote}"
  # Kernel.puts scp_cmd
  CommonFunctions.shell(scp_cmd)

  job_data["@local"] = remote
  # Kernel.puts "job data = #{job_data.inspect}"
  response = controller.put_input(job_data)
  if response
    return {:result => :success}
  else
    # TODO - expand this to include the reason why it failed
    return {:result => :failure}
  end
end

.get_job_data(params) ⇒ Object

This method takes in a hash in the format that users write neptune/babel jobs in => “b” and converts it to the legacy format that Neptune used to use => “b”, and is understood by the NeptuneManager.



314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/neptune.rb', line 314

def self.get_job_data(params)
  job_data = {}
  params.each { |k, v|
    key = "@#{k}"
    job_data[key] = v
  }

  job_data.delete("@job")
  job_data["@keyname"] = params[:keyname] || "appscale"

  job_data["@type"] = job_data["@type"].to_s
  type = job_data["@type"]

  if !ALLOWED_JOB_TYPES.include?(type)
    raise BadConfigurationException.new(JOB_TYPE_NOT_ALLOWED)
  end

  if type == "upc" or type == "x10"
    job_data["@type"] = "mpi"
    type = "mpi"
  end

  # kdt jobs also run as mpi jobs, but need to pass along an executable
  # parameter to let mpiexec know to use python to exec it
  if type == "kdt"
    job_data["@type"] = "mpi"
    type = "mpi"

    job_data["@executable"] = "python"
  end

  if job_data["@nodes_to_use"].class == Hash
    job_data["@nodes_to_use"] = job_data["@nodes_to_use"].to_a.flatten
  end

  if !NO_OUTPUT_NEEDED.include?(type)
    if (job_data["@output"].nil? or job_data["@output"].empty?)
      raise BadConfigurationException.new("Job output must be specified")
    end

    if job_data["@output"][0].chr != "/"
      raise BadConfigurationException.new("Job output must begin with a slash ('/')")
    end
  end

  return job_data
end

.get_std_out_and_err(location) ⇒ Object

This method returns a hash containing the standard out and standard error from a completed job, as well as a result field that indicates whether or not the job completed successfully (success = no errors).



487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
# File 'lib/neptune.rb', line 487

def self.get_std_out_and_err(location)
  result = {}

  out = File.open("#{location}/compile_out") { |f| f.read.chomp! }
  result[:out] = out

  err = File.open("#{location}/compile_err") { |f| f.read.chomp! }
  result[:err] = err

  if result[:err]
    result[:result] = :failure
  else
    result[:result] = :success
  end    

  return result
end

.preprocess_babel(job_data, controller) ⇒ Object

This preprocessing method verifies that the user specified code that should be run, where the output should be placed, and an engine to run over. It also verifies that all files to be used are actually reachable. Supported engines can be found by contacting an AppScale node.



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/neptune.rb', line 262

def self.preprocess_babel(job_data, controller)
  self.require_param("@code", job_data)
  self.require_param("@engine", job_data)
  self.require_param("@output", job_data)
  self.require_param("@error", job_data)
  self.require_param("@metadata", job_data)

  # For most code types, the file's name given is the thing to exec.
  # For Java, the actual file to search for is whatever the user gives
  # us, with a .class extension.
  code_file_name = job_data["@code"]
  if !job_data["@executable"].nil? and job_data["@executable"] == "java"
    code_file_name += ".class"
  end

  self.require_file_to_exist(code_file_name, job_data, controller)
  self.require_file_to_not_exist(job_data["@output"], job_data, controller)
  self.require_file_to_not_exist(job_data["@error"], job_data, controller)
  self.require_file_to_not_exist(job_data["@metadata"], job_data, controller)

  if job_data["@argv"]
    argv = job_data["@argv"]
    if argv.class != Array
      raise BadConfigurationException.new("argv must be an array")
    end

    argv.each { |arg|
      if arg =~ /\/.*\/.*/
        self.require_file_to_exist(arg, job_data, controller)
      end
    }
  end

  if job_data["@appcfg_cookies"]
    self.require_file_to_exist(job_data["@appcfg_cookies"], job_data, controller)
  end

  user_specified_engine = job_data["@engine"]

  # validate the engine here
  engines = controller.get_supported_babel_engines(job_data)
  if !engines.include?(user_specified_engine)
    raise BadConfigurationException.new("The engine you specified, " +
      "#{user_specified_engine}, is not a supported engine. Supported engines" +
      " are: #{engines.join(', ')}")
  end
end

.preprocess_compile(job_data, controller) ⇒ Object

This preprocessing method copies over the user’s code to the Shadow node so that it can be compiled there. A future version of this method may also copy over libraries as well.



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/neptune.rb', line 130

def self.preprocess_compile(job_data, controller)
  code = File.expand_path(job_data["@code"])
  if !File.exists?(code)
    raise BadConfigurationException.new("The source file #{code} does not exist.")
  end

  suffix = code.split('/')[-1]
  dest = "/tmp/#{suffix}"
  keyname = job_data["@keyname"]
  shadow_ip = CommonFunctions.get_from_yaml(keyname, :shadow)

  ssh_args = "-i ~/.appscale/#{keyname}.key -o StrictHostkeyChecking=no root@#{shadow_ip}"
  remove_dir = "ssh #{ssh_args} 'rm -rf #{dest}' 2>&1"
  # Kernel.puts remove_dir
  CommonFunctions.shell(remove_dir)
  CommonFunctions.scp_to_shadow(code, dest, keyname, is_dir=true)

  job_data["@code"] = dest
end

.preprocess_erlang(job_data, controller) ⇒ Object

This preprocessing method makes sure that the user’s Erlang code exists and copies it over to the AppScale Shadow node.



153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/neptune.rb', line 153

def self.preprocess_erlang(job_data, controller)
  self.require_param("@code", job_data)

  source_code = File.expand_path(job_data["@code"])
  if !File.exists?(source_code)
    raise BadConfigurationException.new("The specified code, #{job_data['@code']}," +
      " didn't exist. Please specify one that exists and try again")
  end
  dest_code = "/tmp/"

  keyname = job_data["@keyname"]
  CommonFunctions.scp_to_shadow(source_code, dest_code, keyname)
end

.preprocess_mpi(job_data, controller) ⇒ Object

This preprocessing method verifies that the user specified the number of nodes to use. If they also specified the number of processes to use, we also verify that this value is at least as many as the number of nodes (that is, nodes can’t be underprovisioned in MPI).



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/neptune.rb', line 172

def self.preprocess_mpi(job_data, controller)
  self.require_param("@nodes_to_use", job_data)
  self.require_param("@procs_to_use", job_data)
  self.require_param("@output", job_data)
  self.require_param("@error", job_data)
  self.require_param("@metadata", job_data)

  if job_data["@procs_to_use"]
    p = job_data["@procs_to_use"]
    n = job_data["@nodes_to_use"]
    if p < n
      raise BadConfigurationException.new(":procs_to_use must be at least as " +
        "large as :nodes_to_use.") 
    end
  end

  if job_data["@argv"]
    argv = job_data["@argv"]

    if argv.class == String
      job_data["@argv"] = argv
    elsif argv.class == Array
      job_data["@argv"] = argv.join(' ')
    else
      raise BadConfigurationException.new(":argv must be either a String or Array") 
    end
  end

  return job_data
end

.preprocess_ssa(job_data, controller) ⇒ Object

This preprocessing method verifies that the user specified the number of trajectories to run, via either :trajectories or :simulations. Both should not be specified - only one or the other, and regardless of which they specify, convert it to be :trajectories.



208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/neptune.rb', line 208

def self.preprocess_ssa(job_data, controller)
  if job_data["@simulations"] and job_data["@trajectories"]
    raise BadConfigurationException.new(":simulations and :trajectories " +
      "not both be specified.")
  end

  if job_data["@simulations"]
    job_data["@trajectories"] = job_data["@simulations"]
    job_data.delete("@simulations")
  end

  self.require_param("@trajectories", job_data)
  return job_data
end

.require_file_to_exist(file, job_data, controller) ⇒ Object

This helper method asks the NeptuneManager if the named file exists, and if it does not, throws an exception.



235
236
237
238
239
240
241
242
# File 'lib/neptune.rb', line 235

def self.require_file_to_exist(file, job_data, controller)
  if controller.does_file_exist?(file, job_data)
    return
  else
    raise FileNotFoundException.new("Expecting file #{file} to exist " +
      "in the remote datastore, which did not exist.")
  end
end

.require_file_to_not_exist(file, job_data, controller) ⇒ Object

This helper method performs the opposite function of require_file_to_exist, raising an exception if the named file does exist.



247
248
249
250
251
252
253
254
255
# File 'lib/neptune.rb', line 247

def self.require_file_to_not_exist(file, job_data, controller)
  begin
    self.require_file_to_exist(file, job_data, controller)
    # no exception thrown previously means that the output file exists
    raise BadConfigurationException.new('Output specified already exists')
  rescue FileNotFoundException
    return
  end
end

.require_param(param, job_data) ⇒ Object

This helper method aborts if the given parameter is not present in the job data provided.



226
227
228
229
230
# File 'lib/neptune.rb', line 226

def self.require_param(param, job_data)
  if !job_data[param]
    raise BadConfigurationException.new("#{param} must be specified")
  end
end

.run_job(job_data, ssh_args, shadow_ip, secret) ⇒ Object

This method actually runs the Neptune job, given information about the job as well as information about the node to send the request to.



537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
# File 'lib/neptune.rb', line 537

def self.run_job(job_data, ssh_args, shadow_ip, secret)
  controller = NeptuneManagerClient.new(shadow_ip, secret)

  # TODO - right now the job is assumed to succeed in many cases
  # need to investigate the various failure scenarios
  result = { :result => :success }

  case job_data["@type"]
  when "input"
    result = self.get_input(job_data, ssh_args, shadow_ip, controller)
  when "output"
    result[:output] = controller.get_output(job_data)
  when "get-acl"
    job_data["@type"] = "acl"
    result[:acl] = controller.get_acl(job_data)
  when "set-acl"
    job_data["@type"] = "acl"
    result[:acl] = controller.set_acl(job_data)
  when "compile"
    result = self.compile_code(job_data, ssh_args, shadow_ip)
  when "cicero"
    self.upload_app_for_cicero(job_data)
    msg = controller.start_neptune_job(job_data)
    result[:msg] = msg
    result[:result] = :failure if result[:msg] !~ /job is now running\Z/
  else
    msg = controller.start_neptune_job(job_data)
    result[:msg] = msg
    result[:result] = :failure if result[:msg] !~ /job is now running\Z/
  end

  return result
end

.upload_app_for_cicero(job_data) ⇒ Object

This method uploads a Google App Engine application into AppScale, for use with Cicero jobs. It requires the AppScale tools to be installed.



508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
# File 'lib/neptune.rb', line 508

def self.upload_app_for_cicero(job_data)
  if !job_data["@app"]
    # Kernel.puts "No app specified, not uploading..." 
    return
  end

  app_location = File.expand_path(job_data["@app"])
  if !File.exists?(app_location)
    raise BadConfigurationException.new("The app you specified, #{app_location}, does not exist." + 
      "Please specify one that does and try again.")
  end

  keyname = job_data["@keyname"] || "appscale"
  if job_data["@appscale_tools"]
    upload_app = File.expand_path(job_data["@appscale_tools"]) +
      File::SEPARATOR + "bin" + File::SEPARATOR + "appscale-upload-app"
  else
    upload_app = "appscale-upload-app"
  end

  # Kernel.puts "Uploading AppEngine app at #{app_location}"
  upload_command = "#{upload_app} --file #{app_location} --test --keyname #{keyname}"
  # Kernel.puts upload_command
  # Kernel.puts `#{upload_command}`
end

.validate_storage_params(job_data) ⇒ Object

This method looks through the given job data and makes sure that the correct parameters are present for the storage mechanism specified. It throws an exception if there are errors in the job data or if a needed parameter is missing.



367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/neptune.rb', line 367

def self.validate_storage_params(job_data)
  job_data["@storage"] ||= "appdb"

  storage = job_data["@storage"]
  if !ALLOWED_STORAGE_TYPES.include?(storage)
    raise BadConfigurationException.new("Supported storage types are " +
      "#{ALLOWED_STORAGE_TYPES.join(', ')} - #{storage} is not supported.")
  end

  # Our implementation for storing / retrieving via Google Storage
  # and Walrus uses
  # the same library as we do for S3 - so just tell it that it's S3
  if storage == "gstorage" or storage == "walrus"
    storage = "s3"
    job_data["@storage"] = "s3"
  end

  if storage == "s3"
    ["EC2_ACCESS_KEY", "EC2_SECRET_KEY", "S3_URL"].each { |item|
      if job_data["@#{item}"]
        # Kernel.puts "Using specified #{item}"
      else
        if ENV[item]
          # Kernel.puts "Using #{item} from environment"
          job_data["@#{item}"] = ENV[item]
        else
          raise BadConfigurationException.new("When storing data to S3, #{item} must be specified or be in " + 
            "your environment. Please do so and try again.")
        end
      end
    }
  end

  return job_data
end

.wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location) ⇒ Object

This method waits for AppScale to finish compiling the user’s code, indicated by AppScale copying the finished code to a pre-determined location.



438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
# File 'lib/neptune.rb', line 438

def self.wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location)
  loop {
    ssh_command = "ssh #{ssh_args} root@#{shadow_ip} 'ls #{compiled_location}' 2>&1"
    # Kernel.puts ssh_command
    ssh_result = CommonFunctions.shell(ssh_command)
    # Kernel.puts "result was [#{ssh_result}]"
    if ssh_result =~ /No such file or directory/
      # Kernel.puts "Still waiting for code to be compiled..."
    else
      # Kernel.puts "compilation complete! Copying compiled code to #{copy_to}"
      return
    end
    sleep(5)
  }
end