Top Level Namespace

Defined Under Namespace

Modules: CommonFunctions, Kernel Classes: AppControllerClient, Object

Constant Summary collapse

NO_NODES_NEEDED =

A list of Neptune jobs that do not require nodes to be spawned up for computation

["acl", "input", "output", "compile"]
NO_OUTPUT_NEEDED =

A list of Neptune jobs that do not require the output to be specified beforehand

["input"]
ALLOWED_STORAGE_TYPES =

A list of storage mechanisms that we can use to store and retrieve data to for Neptune jobs.

["appdb", "gstorage", "s3", "walrus"]
NEED_PREPROCESSING =

A list of jobs that require some kind of work to be done before the actual computation can be performed.

["compile", "erlang", "mpi", "ssa"]
NO_TIMEOUT =

Sometimes SOAP calls take a long time if large amounts of data are being sent over the network: for this first version we don’t want these calls to endlessly timeout and retry, so as a hack, just don’t let them timeout. The next version should replace this and properly timeout and not use long calls unless necessary.

-1

Instance Method Summary collapse

Instance Method Details

#compile_code(job_data, ssh_args, shadow_ip, shell = Kernel.method(:`)) ⇒ Object

This method sends out a request to compile code, waits for it to finish, and gets the standard out and error returned from the compilation. This method returns a hash containing the standard out, error, and a result that indicates whether or not the compilation was successful.



301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/neptune.rb', line 301

def compile_code(job_data, ssh_args, shadow_ip, shell=Kernel.method(:`))
  compiled_location = controller.compile_code(job_data)

  copy_to = job_data["@copy_to"]

  wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location)

  FileUtils.rm_rf(copy_to)

  scp_command = "scp -r #{ssh_args} root@#{shadow_ip}:#{compiled_location} #{copy_to} 2>&1"
  puts scp_command
  shell.call(scp_command)

  code = job_data["@code"]
  dirs = code.split(/\//)
  remote_dir = "/tmp/" + dirs[-1] 

  [remote_dir, compiled_location].each { |remote_files|
    ssh_command = "ssh #{ssh_args} root@#{shadow_ip} 'rm -rf #{remote_files}' 2>&1"
    puts ssh_command
    shell.call(ssh_command)
  }

  return get_std_out_and_err(copy_to)
end

#do_preprocessing(job_data) ⇒ Object

Certain types of jobs need steps to be taken before they can be started (e.g., copying input data or code over). This method dispatches the right method to use based on the type of the job that the user has asked to run.



52
53
54
55
56
57
58
59
60
# File 'lib/neptune.rb', line 52

def do_preprocessing(job_data)
  job_type = job_data["@type"]
  if !NEED_PREPROCESSING.include?(job_type)
    return
  end

  preprocess = "preprocess_#{job_type}".to_sym
  send(preprocess, job_data)
end

#get_input(job_data, ssh_args, shadow_ip, controller, file = File, shell = Kernel.method(:`)) ⇒ Object

This method takes a file on the local user’s computer and stores it remotely via AppScale. It returns a hash map indicating whether or not the job succeeded and if it failed, the reason for it.



247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/neptune.rb', line 247

def get_input(job_data, ssh_args, shadow_ip, controller, file=File,
  shell=Kernel.method(:`))
  result = {:result => :success}

  if !job_data["@local"]
    abort("You failed to specify a file to copy over via the :local flag.")
  end

  local_file = file.expand_path(job_data["@local"])
  if !file.exists?(local_file)
    reason = "the file you specified to copy, #{local_file}, doesn't exist." + 
        " Please specify a file that exists and try again."
    return {:result => :failure, :reason => reason}  
  end

  remote = "/tmp/neptune-input-#{rand(100000)}"
  scp_cmd = "scp -r #{ssh_args} #{local_file} root@#{shadow_ip}:#{remote}"
  puts scp_cmd
  shell.call(scp_cmd)

  job_data["@local"] = remote
  puts "job data = #{job_data.inspect}"
  response = controller.put_input(job_data)
  if response
    return {:result => :success}
  else
    # TODO - expand this to include the reason why it failed
    return {:result => :failure}
  end
end

#get_job_data(params) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/neptune.rb', line 162

def get_job_data(params)
  job_data = {}
  params.each { |k, v|
    key = "@#{k}"
    job_data[key] = v
  }

  job_data.delete("@job")
  job_data["@keyname"] = params[:keyname] || "appscale"

  job_data["@type"] = job_data["@type"].to_s
  type = job_data["@type"]

  if type == "upc" or type == "x10"
    job_data["@type"] = "mpi"
    type = "mpi"
  end

  # kdt jobs also run as mpi jobs, but need to pass along an executable
  # parameter to let mpiexec know to use python to exec it
  if type == "kdt"
    job_data["@type"] = "mpi"
    type = "mpi"

    job_data["@executable"] = "python"
  end

  if job_data["@nodes_to_use"].class == Hash
    job_data["@nodes_to_use"] = job_data["@nodes_to_use"].to_a.flatten
  end

  if !NO_OUTPUT_NEEDED.include?(type)
    if (job_data["@output"].nil? or job_data["@output"] == "")
      abort("Job output must be specified")
    end

    if job_data["@output"][0].chr != "/"
      abort("Job output must begin with a slash ('/')")
    end
  end

  return job_data
end

#get_std_out_and_err(location) ⇒ Object

This method returns a hash containing the standard out and standard error from a completed job, as well as a result field that indicates whether or not the job completed successfully (success = no errors).



330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
# File 'lib/neptune.rb', line 330

def get_std_out_and_err(location)
  result = {}

  out = File.open("#{location}/compile_out") { |f| f.read.chomp! }
  result[:out] = out

  err = File.open("#{location}/compile_err") { |f| f.read.chomp! }
  result[:err] = err

  if result[:err]
    result[:result] = :failure
  else
    result[:result] = :success
  end    

  return result
end

#neptune(params) ⇒ Object

This method is the heart of Neptune - here, we take blocks of code that the user has written and convert them into HPC job requests. At a high level, the user can request to run a job, retrieve a job’s output, or modify the access policy (ACL) for the output of a job. By default, job data is private, but a Neptune job can be used to set it to public later (and vice-versa).



419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# File 'lib/neptune.rb', line 419

def neptune(params)
  puts "Received a request to run a job."
  puts params[:type]

  job_data = get_job_data(params)
  validate_storage_params(job_data)
  puts "job data = #{job_data.inspect}"
  do_preprocessing(job_data) 
  keyname = job_data["@keyname"]

  shadow_ip = CommonFunctions.get_from_yaml(keyname, :shadow)
  secret = CommonFunctions.get_secret_key(keyname)
  ssh_key = File.expand_path("~/.appscale/#{keyname}.key")
  ssh_args = "-i ~/.appscale/#{keyname}.key -o StrictHostkeyChecking=no "

  return run_job(job_data, ssh_args, shadow_ip, secret)
end

#preprocess_compile(job_data, shell = Kernel.method(:`)) ⇒ Object

This preprocessing method copies over the user’s code to the Shadow node so that it can be compiled there. A future version of this method may also copy over libraries as well.



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/neptune.rb', line 65

def preprocess_compile(job_data, shell=Kernel.method(:`))
  code = File.expand_path(job_data["@code"])
  if !File.exists?(code)
    abort("The source file #{code} does not exist.")
  end

  suffix = code.split('/')[-1]
  dest = "/tmp/#{suffix}"
  keyname = job_data["@keyname"]
  shadow_ip = CommonFunctions.get_from_yaml(keyname, :shadow)

  ssh_args = "-i ~/.appscale/#{keyname}.key -o StrictHostkeyChecking=no root@#{shadow_ip}"
  remove_dir = "ssh #{ssh_args} 'rm -rf #{dest}' 2>&1"
  puts remove_dir
  shell.call(remove_dir)

  CommonFunctions.scp_to_shadow(code, dest, keyname, is_dir=true)

  job_data["@code"] = dest
end

#preprocess_erlang(job_data, file = File, common_functions = CommonFunctions) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/neptune.rb', line 86

def preprocess_erlang(job_data, file=File, common_functions=CommonFunctions)
  if !job_data["@code"]
    abort("When running Erlang jobs, :code must be specified.")
  end

  source_code = file.expand_path(job_data["@code"])
  if !file.exists?(source_code)
    abort("The specified code, #{job_data['@code']}," +
      " didn't exist. Please specify one that exists and try again")
  end
  dest_code = "/tmp/"

  keyname = job_data["@keyname"]
  common_functions.scp_to_shadow(source_code, dest_code, keyname)
end

#preprocess_mpi(job_data) ⇒ Object

This preprocessing method verifies that the user specified the number of nodes to use. If they also specified the number of processes to use, we also verify that this value is at least as many as the number of nodes (that is, nodes can’t be underprovisioned in MPI).



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/neptune.rb', line 106

def preprocess_mpi(job_data)
  if !job_data["@nodes_to_use"]
    abort("When running MPI jobs, :nodes_to_use must be specified.")
  end

  if !job_data["@procs_to_use"]
    abort("When running MPI jobs, :procs_to_use must be specified.")
  end

  if job_data["@procs_to_use"]
    p = job_data["@procs_to_use"]
    n = job_data["@nodes_to_use"]
    if p < n
      abort("When specifying both :procs_to_use and :nodes_to_use" +
        ", :procs_to_use must be at least as large as :nodes_to_use. Please " +
        "change this and try again. You specified :procs_to_use = #{p} and" +
        ":nodes_to_use = #{n}.")
    end
  end

  if job_data["@argv"]
    argv = job_data["@argv"]
    if argv.class != String and argv.class != Array
      abort("The value specified for :argv must be either a String or Array") 
    end

    if argv.class == Array
      job_data["@argv"] = argv.join(' ')
    end
  end

  return job_data
end

#preprocess_ssa(job_data) ⇒ Object

This preprocessing method verifies that the user specified the number of trajectories to run, via either :trajectories or :simulations. Both should not be specified - only one or the other, and regardless of which they specify, convert it to be :trajectories.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/neptune.rb', line 144

def preprocess_ssa(job_data)
  if job_data["@simulations"] and job_data["@trajectories"]
    abort("Both :simulations and :trajectories cannot be specified - use one" +
      " or the other.")
  end

  if job_data["@simulations"]
    job_data["@trajectories"] = job_data["@simulations"]
    job_data.delete("@simulations")
  end

  if !job_data["@trajectories"]
    abort(":trajectories needs to be specified when running ssa jobs")
  end

  return job_data
end

#run_job(job_data, ssh_args, shadow_ip, secret, controller = AppControllerClient, file = File) ⇒ Object

This method actually runs the Neptune job, given information about the job as well as information about the node to send the request to.



376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/neptune.rb', line 376

def run_job(job_data, ssh_args, shadow_ip, secret,
  controller=AppControllerClient, file=File)
  controller = controller.new(shadow_ip, secret)

  # TODO - right now the job is assumed to succeed in many cases
  # need to investigate the various failure scenarios
  result = { :result => :success }

  case job_data["@type"]
  when "input"
    result = get_input(job_data, ssh_args, shadow_ip, controller, file)
  when "output"
    result[:output] = controller.get_output(job_data)
  when "get-acl"
    job_data["@type"] = "acl"
    result[:acl] = controller.get_acl(job_data)
  when "set-acl"
    job_data["@type"] = "acl"
    result[:acl] = controller.set_acl(job_data)
  when "compile"
    result = compile_code(job_data, ssh_args, shadow_ip)
  when "cicero"
    upload_app_for_cicero(job_data)
    msg = controller.start_neptune_job(job_data)
    result[:msg] = msg
    result[:result] = :failure if result[:msg] !~ /job is now running\Z/
  else
    msg = controller.start_neptune_job(job_data)
    result[:msg] = msg
    result[:result] = :failure if result[:msg] !~ /job is now running\Z/
  end

  return result
end

#upload_app_for_cicero(job_data) ⇒ Object



348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# File 'lib/neptune.rb', line 348

def upload_app_for_cicero(job_data)
  if !job_data["@app"]
    puts "No app specified, not uploading..." 
    return
  end

  app_location = File.expand_path(job_data["@app"])
  if !File.exists?(app_location)
    abort("The app you specified, #{app_location}, does not exist." + 
      "Please specify one that does and try again.")
  end

  keyname = job_data["@keyname"] || "appscale"
  if job_data["@appscale_tools"]
    upload_app = File.expand_path(job_data["@appscale_tools"]) +
      File::SEPARATOR + "bin" + File::SEPARATOR + "appscale-upload-app"
  else
    upload_app = "appscale-upload-app"
  end

  puts "Uploading AppEngine app at #{app_location}"
  upload_command = "#{upload_app} --file #{app_location} --test --keyname #{keyname}"
  puts upload_command
  puts `#{upload_command}`
end

#validate_storage_params(job_data) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/neptune.rb', line 206

def validate_storage_params(job_data)
  if !job_data["@storage"]
    job_data["@storage"] = "appdb"
  end

  storage = job_data["@storage"]
  if !ALLOWED_STORAGE_TYPES.include?(storage)
    abort("Supported storage types are #{ALLOWED_STORAGE_TYPES.join(', ')}" +
      " - we do not support #{storage}.")
  end

  # Our implementation for storing / retrieving via Google Storage
  # and Walrus uses
  # the same library as we do for S3 - so just tell it that it's S3
  if storage == "gstorage" or storage == "walrus"
    storage = "s3"
    job_data["@storage"] = "s3"
  end

  if storage == "s3"
    ["EC2_ACCESS_KEY", "EC2_SECRET_KEY", "S3_URL"].each { |item|
      if job_data["@#{item}"]
        puts "Using specified #{item}"
      else
        if ENV[item]
          puts "Using #{item} from environment"
          job_data["@#{item}"] = ENV[item]
        else
          abort("When storing data to S3, #{item} must be specified or be in " + 
            "your environment. Please do so and try again.")
        end
      end
    }
  end

  return job_data
end

#wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location, shell = Kernel.method(:`)) ⇒ Object

This method waits for AppScale to finish compiling the user’s code, indicated by AppScale copying the finished code to a pre-determined location.



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/neptune.rb', line 280

def wait_for_compilation_to_finish(ssh_args, shadow_ip, compiled_location,
  shell=Kernel.method(:`))
  loop {
    ssh_command = "ssh #{ssh_args} root@#{shadow_ip} 'ls #{compiled_location}' 2>&1"
    puts ssh_command
    ssh_result = shell.call(ssh_command)
    puts "result was [#{ssh_result}]"
    if ssh_result =~ /No such file or directory/
      puts "Still waiting for code to be compiled..."
    else
      puts "compilation complete! Copying compiled code to #{copy_to}"
      return
    end
    sleep(5)
  }
end