Class: Elasticity::PigJob

Inherits:
SimpleJob show all
Defined in:
lib/elasticity/pig_job.rb

Instance Attribute Summary collapse

Attributes inherited from SimpleJob

#action_on_failure, #aws_access_key_id, #aws_secret_access_key, #ec2_key_name, #hadoop_version, #instance_count, #log_uri, #master_instance_type, #name, #slave_instance_type

Instance Method Summary collapse

Methods inherited from SimpleJob

#add_hadoop_bootstrap_action

Constructor Details

#initialize(aws_access_key_id, aws_secret_access_key) ⇒ PigJob

Returns a new instance of PigJob.



8
9
10
11
12
# File 'lib/elasticity/pig_job.rb', line 8

def initialize(aws_access_key_id, aws_secret_access_key)
  super
  @name = "Elasticity Pig Job"
  @parallels = calculate_parallels
end

Instance Attribute Details

#parallelsObject (readonly)

Automatically passed as Pig argument E_PARALLELS



6
7
8
# File 'lib/elasticity/pig_job.rb', line 6

def parallels
  @parallels
end

Instance Method Details

#instance_count=(num_instances) ⇒ Object



14
15
16
17
18
19
20
# File 'lib/elasticity/pig_job.rb', line 14

def instance_count=(num_instances)
  if num_instances < 2
    raise ArgumentError, "Instance count cannot be set to less than 2 (requested #{num_instances})"
  end
  @instance_count = num_instances
  @parallels = calculate_parallels
end

#run(pig_script, pig_variables = {}) ⇒ Object

Run the specified Pig script with the specified variables.

pig = Elasticity::PigJob.new("access", "secret")
jobflow_id = pig.run('s3n://slif-pig-test/test.pig', {
  'SCRIPTS' => 's3n://slif-pig-test/scripts',
  'OUTPUT'  => 's3n://slif-pig-test/output',
  'XREFS'   => 's3n://slif-pig-test/xrefs'
})

The variables are accessible within your Pig scripts by using the standard $NAME syntax.



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/elasticity/pig_job.rb', line 38

def run(pig_script, pig_variables={})
  script_arguments = ["s3://elasticmapreduce/libs/pig/pig-script", "--run-pig-script", "--args"]
  pig_variables.keys.sort.each do |variable_name|
    script_arguments.concat(["-p", "#{variable_name}=#{pig_variables[variable_name]}"])
  end
  script_arguments.concat(["-p", "E_PARALLELS=#{@parallels}"])
  script_arguments << pig_script
  jobflow_config = {
    :name => @name,
    :instances => {
      :ec2_key_name => @ec2_key_name,
      :hadoop_version => @hadoop_version,
      :instance_count => @instance_count,
      :master_instance_type => @master_instance_type,
      :slave_instance_type => @slave_instance_type,
    },
    :steps => [
      {
        :action_on_failure => "TERMINATE_JOB_FLOW",
        :hadoop_jar_step => {
          :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
          :args => [
            "s3://elasticmapreduce/libs/pig/pig-script",
              "--base-path", "s3://elasticmapreduce/libs/pig/",
              "--install-pig"
          ],
        },
        :name => "Setup Pig"
      },
        {
          :action_on_failure => @action_on_failure,
          :hadoop_jar_step => {
            :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
            :args => script_arguments,
          },
          :name => "Run Pig Script"
        }
    ]
  }

  jobflow_config.merge!(:log_uri => @log_uri) if @log_uri
  jobflow_config.merge!(get_bootstrap_actions)

  @emr.run_job_flow(jobflow_config)
end

#slave_instance_type=(instance_type) ⇒ Object



22
23
24
25
# File 'lib/elasticity/pig_job.rb', line 22

def slave_instance_type=(instance_type)
  @slave_instance_type = instance_type
  @parallels = calculate_parallels
end