Class: Elasticity::PigJob
- Defined in:
- lib/elasticity/pig_job.rb
Instance Attribute Summary collapse
-
#parallels ⇒ Object
readonly
Automatically passed as Pig argument E_PARALLELS.
Attributes inherited from SimpleJob
#action_on_failure, #aws_access_key_id, #aws_secret_access_key, #ec2_key_name, #hadoop_version, #instance_count, #log_uri, #master_instance_type, #name, #slave_instance_type
Instance Method Summary collapse
-
#initialize(aws_access_key_id, aws_secret_access_key) ⇒ PigJob
constructor
A new instance of PigJob.
- #instance_count=(num_instances) ⇒ Object
-
#run(pig_script, pig_variables = {}) ⇒ Object
Run the specified Pig script with the specified variables.
- #slave_instance_type=(instance_type) ⇒ Object
Methods inherited from SimpleJob
Constructor Details
#initialize(aws_access_key_id, aws_secret_access_key) ⇒ PigJob
Returns a new instance of PigJob.
8 9 10 11 12 |
# File 'lib/elasticity/pig_job.rb', line 8 def initialize(aws_access_key_id, aws_secret_access_key) super @name = "Elasticity Pig Job" @parallels = calculate_parallels end |
Instance Attribute Details
#parallels ⇒ Object (readonly)
Automatically passed as Pig argument E_PARALLELS
6 7 8 |
# File 'lib/elasticity/pig_job.rb', line 6 def parallels @parallels end |
Instance Method Details
#instance_count=(num_instances) ⇒ Object
14 15 16 17 18 19 20 |
# File 'lib/elasticity/pig_job.rb', line 14 def instance_count=(num_instances) if num_instances < 2 raise ArgumentError, "Instance count cannot be set to less than 2 (requested #{num_instances})" end @instance_count = num_instances @parallels = calculate_parallels end |
#run(pig_script, pig_variables = {}) ⇒ Object
Run the specified Pig script with the specified variables.
pig = Elasticity::PigJob.new("access", "secret")
jobflow_id = pig.run('s3n://slif-pig-test/test.pig', {
'SCRIPTS' => 's3n://slif-pig-test/scripts',
'OUTPUT' => 's3n://slif-pig-test/output',
'XREFS' => 's3n://slif-pig-test/xrefs'
})
The variables are accessible within your Pig scripts by using the standard $NAME syntax.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/elasticity/pig_job.rb', line 38 def run(pig_script, pig_variables={}) script_arguments = ["s3://elasticmapreduce/libs/pig/pig-script", "--run-pig-script", "--args"] pig_variables.keys.sort.each do |variable_name| script_arguments.concat(["-p", "#{variable_name}=#{pig_variables[variable_name]}"]) end script_arguments.concat(["-p", "E_PARALLELS=#{@parallels}"]) script_arguments << pig_script jobflow_config = { :name => @name, :instances => { :ec2_key_name => @ec2_key_name, :hadoop_version => @hadoop_version, :instance_count => @instance_count, :master_instance_type => @master_instance_type, :slave_instance_type => @slave_instance_type, }, :steps => [ { :action_on_failure => "TERMINATE_JOB_FLOW", :hadoop_jar_step => { :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar", :args => [ "s3://elasticmapreduce/libs/pig/pig-script", "--base-path", "s3://elasticmapreduce/libs/pig/", "--install-pig" ], }, :name => "Setup Pig" }, { :action_on_failure => @action_on_failure, :hadoop_jar_step => { :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar", :args => script_arguments, }, :name => "Run Pig Script" } ] } jobflow_config.merge!(:log_uri => @log_uri) if @log_uri jobflow_config.merge!(get_bootstrap_actions) @emr.run_job_flow(jobflow_config) end |
#slave_instance_type=(instance_type) ⇒ Object
22 23 24 25 |
# File 'lib/elasticity/pig_job.rb', line 22 def slave_instance_type=(instance_type) @slave_instance_type = instance_type @parallels = calculate_parallels end |