Module: BabelHelper
- Defined in:
- lib/babel.rb
Overview
This module provides convenience functions for babel().
Class Method Summary collapse
-
.check_output_files(job_data) ⇒ Object
babel() callers do not have to specify a location where the standard output and error the task produces should be placed.
-
.convert_from_neptune_params(params) ⇒ Object
Neptune internally uses job_data with keys of the form @name, but since the user has given them to us in the form :name, we convert it here.
-
.convert_to_neptune_params(job_data) ⇒ Object
Neptune input jobs expect keys of the form :name, but since we’ve already converted them to the form @name, this function reverses that conversion.
-
.ensure_output_does_not_exist(job_data, remote_file) ⇒ Object
To avoid accidentally overwriting outputs from previous jobs, we first check to make sure an output file doesn’t exist before starting a new job with the given name.
-
.generate_output_location(job_data) ⇒ Object
If the user fails to give us an output location, this function will generate one for them, based on either the location of their code (for remotely specified code), or a babel parameter (for locally specified code).
-
.get_bucket_for_local_data(job_data) ⇒ Object
Provides a common way for callers to get the name of the bucket that should be used for Neptune jobs where the code is stored locally.
-
.get_neptune_manager_client(job_data) ⇒ Object
Returns an NeptuneManagerClient for the given job data.
-
.put_code(job_data) ⇒ Object
Stores the user’s code (and the directory it’s in, and directories in the same directory as the user’s code, since there could be libraries used) in the remote datastore.
-
.put_file(local_path, job_data) ⇒ Object
If the user gives us local code or local inputs, this function will run a Neptune ‘input’ job to store the data remotely.
-
.put_inputs(job_data) ⇒ Object
If any input files are specified, they are copied to the remote datastore via Neptune ‘input’ jobs.
-
.run_job(job_data_list) ⇒ Object
Constructs a Neptune job to run the user’s code as a Babel job (task queue) from the given parameters.
-
.validate_inputs(job_data) ⇒ Object
For jobs where the code is stored remotely, this method ensures that the code and any possible inputs actually do exist, before attempting to use them for computation.
-
.wait_and_get_output(job_data, output_location) ⇒ Object
Constructs a Neptune job to get the output of a Babel job.
Class Method Details
.check_output_files(job_data) ⇒ Object
babel() callers do not have to specify a location where the standard output and error the task produces should be placed. If they don’t, generate locations for them and make sure they don’t exist beforehand.
145 146 147 148 149 150 151 152 153 |
# File 'lib/babel.rb', line 145 def self.check_output_files(job_data) ["@output", "@error", "@metadata"].each { |item| if job_data[item].nil? or job_data[item].empty? job_data[item] = BabelHelper.generate_output_location(job_data) else BabelHelper.ensure_output_does_not_exist(job_data, job_data[item]) end } end |
.convert_from_neptune_params(params) ⇒ Object
Neptune internally uses job_data with keys of the form @name, but since the user has given them to us in the form :name, we convert it here. TODO(cgb): It looks like this conversion to/from may be unnecessary since neptune() just re-converts it - how can we remove it?
256 257 258 259 260 261 262 263 |
# File 'lib/babel.rb', line 256 def self.convert_from_neptune_params(params) job_data = {} params.each { |k, v| key = "@#{k}" job_data[key] = v } return job_data end |
.convert_to_neptune_params(job_data) ⇒ Object
Neptune input jobs expect keys of the form :name, but since we’ve already converted them to the form @name, this function reverses that conversion.
268 269 270 271 272 273 274 275 276 277 |
# File 'lib/babel.rb', line 268 def self.convert_to_neptune_params(job_data) neptune_params = {} job_data.each { |k, v| key = k.delete("@").to_sym neptune_params[key] = v } return neptune_params end |
.ensure_output_does_not_exist(job_data, remote_file) ⇒ Object
To avoid accidentally overwriting outputs from previous jobs, we first check to make sure an output file doesn’t exist before starting a new job with the given name.
181 182 183 184 |
# File 'lib/babel.rb', line 181 def self.ensure_output_does_not_exist(job_data, remote_file) controller = self.get_neptune_manager_client(job_data) NeptuneHelper.require_file_to_not_exist(remote_file, job_data, controller) end |
.generate_output_location(job_data) ⇒ Object
If the user fails to give us an output location, this function will generate one for them, based on either the location of their code (for remotely specified code), or a babel parameter (for locally specified code).
109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/babel.rb', line 109 def self.generate_output_location(job_data) if job_data["@is_remote"] # We already know the bucket name - the same one that the user # has told us their code is located in. prefix = job_data["@code"].scan(/\/(.*?)\//)[0].to_s else prefix = self.get_bucket_for_local_data(job_data) end return "/#{prefix}/babel/temp-#{CommonFunctions.get_random_alphanumeric()}" end |
.get_bucket_for_local_data(job_data) ⇒ Object
Provides a common way for callers to get the name of the bucket that should be used for Neptune jobs where the code is stored locally.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/babel.rb', line 124 def self.get_bucket_for_local_data(job_data) bucket_name = job_data["@bucket_name"] || ENV['BABEL_BUCKET_NAME'] || job_data["@S3_bucket_name"] || job_data["@Walrus_bucket_name"] || job_data["@GStorage_bucket_name"] || job_data["@WAZ_Container_Name"] if bucket_name.nil? raise BadConfigurationException.new(NEEDS_BUCKET_INFO) end # If the bucket name starts with a slash, remove it if bucket_name[0].chr == "/" bucket_name = bucket_name[1, bucket_name.length] end return bucket_name end |
.get_neptune_manager_client(job_data) ⇒ Object
Returns an NeptuneManagerClient for the given job data.
188 189 190 191 192 193 |
# File 'lib/babel.rb', line 188 def self.get_neptune_manager_client(job_data) keyname = job_data["@keyname"] || "appscale" shadow_ip = CommonFunctions.get_from_yaml(keyname, :shadow) secret = CommonFunctions.get_secret_key(keyname) return NeptuneManagerClient.new(shadow_ip, secret) end |
.put_code(job_data) ⇒ Object
Stores the user’s code (and the directory it’s in, and directories in the same directory as the user’s code, since there could be libraries used) in the remote datastore.
199 200 201 202 203 204 205 |
# File 'lib/babel.rb', line 199 def self.put_code(job_data) code_dir = File.dirname(job_data["@code"]) code = File.basename(job_data["@code"]) remote_code_dir = self.put_file(code_dir, job_data) job_data["@code"] = remote_code_dir + "/" + code return job_data["@code"] end |
.put_file(local_path, job_data) ⇒ Object
If the user gives us local code or local inputs, this function will run a Neptune ‘input’ job to store the data remotely.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/babel.rb', line 230 def self.put_file(local_path, job_data) input_data = self.convert_to_neptune_params(job_data) input_data[:type] = "input" input_data[:local] = local_path bucket_name = self.get_bucket_for_local_data(job_data) input_data[:remote] = "/#{bucket_name}/babel#{local_path}" start = Time.now Kernel.neptune(input_data) fin = Time.now if job_data['@metadata_info'].nil? job_data['@metadata_info'] = {'time_to_store_inputs' => 0.0} end job_data['@metadata_info']['time_to_store_inputs'] += fin - start return input_data[:remote] end |
.put_inputs(job_data) ⇒ Object
If any input files are specified, they are copied to the remote datastore via Neptune ‘input’ jobs. Inputs are assumed to be files on the local filesystem if they begin with a slash, and job_data gets updated with the remote location of these files.
212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/babel.rb', line 212 def self.put_inputs(job_data) if job_data["@argv"].nil? or job_data["@argv"].empty? return job_data end job_data["@argv"].each_index { |i| arg = job_data["@argv"][i] if arg[0].chr == "/" job_data["@argv"][i] = self.put_file(arg, job_data) end } return job_data end |
.run_job(job_data_list) ⇒ Object
Constructs a Neptune job to run the user’s code as a Babel job (task queue) from the given parameters.
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 |
# File 'lib/babel.rb', line 282 def self.run_job(job_data_list) run_data_list = [] job_data_list.each { |job_data| run_data = self.convert_to_neptune_params(job_data) # Default to babel as the job type, if the user doesn't specify one. if run_data[:type].nil? or run_data[:type].empty? run_data[:type] = "babel" end # TODO(cgb): Once AppScale+Babel gets support for RabbitMQ, change this to # exec tasks over it, instead of locally. if job_data["@run_local"].nil? run_data[:run_local] = true run_data[:engine] = "executor-sqs" end run_data[:failed_attempts] = 0 run_data_list << run_data } loop { if run_data_list.length == 1 run_job = Kernel.neptune(run_data_list[0]) else run_job = Kernel.neptune(run_data_list) end if run_job[:result] == :success return run_job else run_data_list[0][:failed_attempts] += 1 Kernel.sleep(SLEEP_TIME) # TODO(cgb): this should exponentially backoff end } end |
.validate_inputs(job_data) ⇒ Object
For jobs where the code is stored remotely, this method ensures that the code and any possible inputs actually do exist, before attempting to use them for computation.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/babel.rb', line 159 def self.validate_inputs(job_data) controller = self.get_neptune_manager_client(job_data) # First, make sure the code exists NeptuneHelper.require_file_to_exist(job_data["@code"], job_data, controller) if job_data["@argv"].nil? or job_data["@argv"].empty? return end # We assume anything that begins with a slash is a remote file job_data["@argv"].each { |arg| if arg[0].chr == "/" NeptuneHelper.require_file_to_exist(arg, job_data, controller) end } end |
.wait_and_get_output(job_data, output_location) ⇒ Object
Constructs a Neptune job to get the output of a Babel job. If the job is not yet finished, this function waits until it does, and then returns the output of the job.
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/babel.rb', line 324 def self.wait_and_get_output(job_data, output_location) output_data = self.convert_to_neptune_params(job_data) output_data[:type] = "output" output_data[:output] = output_location output = "" time_to_sleep = SLEEP_TIME loop { output = Kernel.neptune(output_data)[:output] if output == DOES_NOT_EXIST # Exponentially back off, up to a limit of MAX_SLEEP_TIME Kernel.sleep(time_to_sleep) #if time_to_sleep < MAX_SLEEP_TIME # time_to_sleep *= 2 #end else break end } return output end |