Module: BabelHelper

Defined in:
lib/babel.rb

Overview

This module provides convenience functions for babel().

Class Method Summary collapse

Class Method Details

.check_output_files(job_data) ⇒ Object

babel() callers do not have to specify a location where the standard output and error the task produces should be placed. If they don’t, generate locations for them and make sure they don’t exist beforehand.



145
146
147
148
149
150
151
152
153
# File 'lib/babel.rb', line 145

def self.check_output_files(job_data)
  ["@output", "@error", "@metadata"].each { |item|
    if job_data[item].nil? or job_data[item].empty?
      job_data[item] = BabelHelper.generate_output_location(job_data)
    else
      BabelHelper.ensure_output_does_not_exist(job_data, job_data[item])
    end
    }
end

.convert_from_neptune_params(params) ⇒ Object

Neptune internally uses job_data with keys of the form @name, but since the user has given them to us in the form :name, we convert it here. TODO(cgb): It looks like this conversion to/from may be unnecessary since neptune() just re-converts it - how can we remove it?



256
257
258
259
260
261
262
263
# File 'lib/babel.rb', line 256

def self.convert_from_neptune_params(params)
  job_data = {}
  params.each { |k, v|
    key = "@#{k}"
    job_data[key] = v
  }
  return job_data
end

.convert_to_neptune_params(job_data) ⇒ Object

Neptune input jobs expect keys of the form :name, but since we’ve already converted them to the form @name, this function reverses that conversion.



268
269
270
271
272
273
274
275
276
277
# File 'lib/babel.rb', line 268

def self.convert_to_neptune_params(job_data)
  neptune_params = {}

  job_data.each { |k, v|
    key = k.delete("@").to_sym
    neptune_params[key] = v
  }

  return neptune_params
end

.ensure_output_does_not_exist(job_data, remote_file) ⇒ Object

To avoid accidentally overwriting outputs from previous jobs, we first check to make sure an output file doesn’t exist before starting a new job with the given name.



181
182
183
184
# File 'lib/babel.rb', line 181

def self.ensure_output_does_not_exist(job_data, remote_file)
  controller = self.get_neptune_manager_client(job_data)
  NeptuneHelper.require_file_to_not_exist(remote_file, job_data, controller)
end

.generate_output_location(job_data) ⇒ Object

If the user fails to give us an output location, this function will generate one for them, based on either the location of their code (for remotely specified code), or a babel parameter (for locally specified code).



109
110
111
112
113
114
115
116
117
118
119
# File 'lib/babel.rb', line 109

def self.generate_output_location(job_data)
  if job_data["@is_remote"]
    # We already know the bucket name - the same one that the user
    # has told us their code is located in.
    prefix = job_data["@code"].scan(/\/(.*?)\//)[0].to_s
  else
    prefix = self.get_bucket_for_local_data(job_data)
  end
    
  return "/#{prefix}/babel/temp-#{CommonFunctions.get_random_alphanumeric()}"
end

.get_bucket_for_local_data(job_data) ⇒ Object

Provides a common way for callers to get the name of the bucket that should be used for Neptune jobs where the code is stored locally.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/babel.rb', line 124

def self.get_bucket_for_local_data(job_data)
  bucket_name = job_data["@bucket_name"] || ENV['BABEL_BUCKET_NAME'] ||
    job_data["@S3_bucket_name"] || job_data["@Walrus_bucket_name"] ||
    job_data["@GStorage_bucket_name"] || job_data["@WAZ_Container_Name"]

  if bucket_name.nil?
    raise BadConfigurationException.new(NEEDS_BUCKET_INFO)
  end

  # If the bucket name starts with a slash, remove it
  if bucket_name[0].chr == "/"
    bucket_name = bucket_name[1, bucket_name.length]
  end

  return bucket_name
end

.get_neptune_manager_client(job_data) ⇒ Object

Returns an NeptuneManagerClient for the given job data.



188
189
190
191
192
193
# File 'lib/babel.rb', line 188

def self.get_neptune_manager_client(job_data)
  keyname = job_data["@keyname"] || "appscale"
  shadow_ip = CommonFunctions.get_from_yaml(keyname, :shadow)
  secret = CommonFunctions.get_secret_key(keyname)
  return NeptuneManagerClient.new(shadow_ip, secret)
end

.put_code(job_data) ⇒ Object

Stores the user’s code (and the directory it’s in, and directories in the same directory as the user’s code, since there could be libraries used) in the remote datastore.



199
200
201
202
203
204
205
# File 'lib/babel.rb', line 199

def self.put_code(job_data)
  code_dir = File.dirname(job_data["@code"])
  code = File.basename(job_data["@code"])
  remote_code_dir = self.put_file(code_dir, job_data)
  job_data["@code"] = remote_code_dir + "/" + code
  return job_data["@code"]
end

.put_file(local_path, job_data) ⇒ Object

If the user gives us local code or local inputs, this function will run a Neptune ‘input’ job to store the data remotely.



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/babel.rb', line 230

def self.put_file(local_path, job_data)
  input_data = self.convert_to_neptune_params(job_data)
  input_data[:type] = "input"
  input_data[:local] = local_path

  bucket_name = self.get_bucket_for_local_data(job_data)
  input_data[:remote] = "/#{bucket_name}/babel#{local_path}"

  start = Time.now
  Kernel.neptune(input_data)
  fin = Time.now
  
  if job_data['@metadata_info'].nil?
    job_data['@metadata_info'] = {'time_to_store_inputs' => 0.0}
  end

  job_data['@metadata_info']['time_to_store_inputs'] += fin - start

  return input_data[:remote]
end

.put_inputs(job_data) ⇒ Object

If any input files are specified, they are copied to the remote datastore via Neptune ‘input’ jobs. Inputs are assumed to be files on the local filesystem if they begin with a slash, and job_data gets updated with the remote location of these files.



212
213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/babel.rb', line 212

def self.put_inputs(job_data)
  if job_data["@argv"].nil? or job_data["@argv"].empty?
    return job_data
  end

  job_data["@argv"].each_index { |i|
    arg = job_data["@argv"][i]
    if arg[0].chr == "/"
      job_data["@argv"][i] = self.put_file(arg, job_data)
    end
  }

  return job_data
end

.run_job(job_data_list) ⇒ Object

Constructs a Neptune job to run the user’s code as a Babel job (task queue) from the given parameters.



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/babel.rb', line 282

def self.run_job(job_data_list)
  run_data_list = []

  job_data_list.each { |job_data|
    run_data = self.convert_to_neptune_params(job_data)

    # Default to babel as the job type, if the user doesn't specify one.
    if run_data[:type].nil? or run_data[:type].empty?
      run_data[:type] = "babel"
    end

    # TODO(cgb): Once AppScale+Babel gets support for RabbitMQ, change this to
    # exec tasks over it, instead of locally.
    if job_data["@run_local"].nil?
      run_data[:run_local] = true
      run_data[:engine] = "executor-sqs"
    end

    run_data[:failed_attempts] = 0
    run_data_list << run_data
  }

  loop {
    if run_data_list.length == 1
      run_job = Kernel.neptune(run_data_list[0])
    else
      run_job = Kernel.neptune(run_data_list)
    end

    if run_job[:result] == :success
      return run_job
    else
      run_data_list[0][:failed_attempts] += 1
      Kernel.sleep(SLEEP_TIME)  # TODO(cgb): this should exponentially backoff
    end
  }
end

.validate_inputs(job_data) ⇒ Object

For jobs where the code is stored remotely, this method ensures that the code and any possible inputs actually do exist, before attempting to use them for computation.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/babel.rb', line 159

def self.validate_inputs(job_data)
  controller = self.get_neptune_manager_client(job_data)

  # First, make sure the code exists
  NeptuneHelper.require_file_to_exist(job_data["@code"], job_data, controller)

  if job_data["@argv"].nil? or job_data["@argv"].empty?
    return
  end

  # We assume anything that begins with a slash is a remote file
  job_data["@argv"].each { |arg|
    if arg[0].chr == "/"
      NeptuneHelper.require_file_to_exist(arg, job_data, controller)
    end
  }
end

.wait_and_get_output(job_data, output_location) ⇒ Object

Constructs a Neptune job to get the output of a Babel job. If the job is not yet finished, this function waits until it does, and then returns the output of the job.



324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/babel.rb', line 324

def self.wait_and_get_output(job_data, output_location)
  output_data = self.convert_to_neptune_params(job_data)
  output_data[:type] = "output"
  output_data[:output] = output_location

  output = ""
  time_to_sleep = SLEEP_TIME
  loop {
    output = Kernel.neptune(output_data)[:output]
    if output == DOES_NOT_EXIST
      # Exponentially back off, up to a limit of MAX_SLEEP_TIME
      Kernel.sleep(time_to_sleep)
      #if time_to_sleep < MAX_SLEEP_TIME
      #  time_to_sleep *= 2
      #end
    else
      break
    end
  }

  return output
end