Class: Elasticity::EMR

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticity/emr.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(aws_access_key_id, aws_secret_access_key, options = {}) ⇒ EMR

Returns a new instance of EMR.



5
6
7
# File 'lib/elasticity/emr.rb', line 5

def initialize(aws_access_key_id, aws_secret_access_key, options = {})
  @aws_request = Elasticity::AwsRequest.new(aws_access_key_id, aws_secret_access_key, options)
end

Class Method Details

.camelize(lower_case_and_underscored_word, first_letter_in_uppercase = true) ⇒ Object

(Used from Rails’ ActiveSupport)



270
271
272
273
274
275
276
# File 'lib/elasticity/emr.rb', line 270

def camelize(lower_case_and_underscored_word, first_letter_in_uppercase = true)
  if first_letter_in_uppercase
    lower_case_and_underscored_word.to_s.gsub(/\/(.?)/) { "::" + $1.upcase }.gsub(/(^|_)(.)/) { $2.upcase }
  else
    lower_case_and_underscored_word.first + camelize(lower_case_and_underscored_word)[1..-1]
  end
end

.convert_ruby_to_aws(params) ⇒ Object

Since we use the same structure as AWS, we can generate AWS param names from the Ruby versions of those names (and the param nesting).



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
# File 'lib/elasticity/emr.rb', line 242

def convert_ruby_to_aws(params)
  result = {}
  params.each do |key, value|
    case value
      when Array
        prefix = "#{camelize(key.to_s)}.member"
        value.each_with_index do |item, index|
          if item.is_a?(String)
            result["#{prefix}.#{index+1}"] = item
          else
            convert_ruby_to_aws(item).each do |nested_key, nested_value|
              result["#{prefix}.#{index+1}.#{nested_key}"] = nested_value
            end
          end
        end
      when Hash
        prefix = "#{camelize(key.to_s)}"
        convert_ruby_to_aws(value).each do |nested_key, nested_value|
          result["#{prefix}.#{nested_key}"] = nested_value
        end
      else
        result[camelize(key.to_s)] = value
    end
  end
  result
end

.parse_error_response(error_xml) ⇒ Object

AWS error responses all follow the same form. Extract the message from the error document.



234
235
236
237
238
# File 'lib/elasticity/emr.rb', line 234

def parse_error_response(error_xml)
  xml_doc = Nokogiri::XML(error_xml)
  xml_doc.remove_namespaces!
  xml_doc.xpath("/ErrorResponse/Error/Message").text
end

Instance Method Details

#add_instance_groups(jobflow_id, instance_group_configs) ⇒ Object

Adds a new group of instances to the specified jobflow. Elasticity maps a more Ruby-like syntax to the Amazon options. An exhaustive hash follows although not all of these options are required (or valid!) at once. Please see the EMR docs for details although even then you’re going to need to experiment :)

instance_group_config = {
  :bid_price => 5,
  :instance_count => 1,
  :instance_role => "TASK",
  :market => "SPOT",
  :name => "Go Canucks Go!"
  :type => "m1.small",
}

add_instance_groups takes an array of {}. Returns an array of the instance IDs that were created by the specified configs.

["ig-2GOVEN6HVJZID", "ig-1DU9M2UQMM051", "ig-3DZRW4Y2X4S", ...]


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/elasticity/emr.rb', line 38

def add_instance_groups(jobflow_id, instance_group_configs)
  params = {
    :operation => "AddInstanceGroups",
    :job_flow_id => jobflow_id,
    :instance_groups => instance_group_configs
  }
  begin
    aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params))
    xml_doc = Nokogiri::XML(aws_result)
    xml_doc.remove_namespaces!
    instance_group_ids = []
    xml_doc.xpath("/AddInstanceGroupsResponse/AddInstanceGroupsResult/InstanceGroupIds/member").each do |member|
      instance_group_ids << member.text
    end
    yield aws_result if block_given?
    instance_group_ids
  rescue RestClient::BadRequest => e
    raise ArgumentError, EMR.parse_error_response(e.http_body)
  end
end

#add_jobflow_steps(jobflow_id, steps_config) ⇒ Object

Add a step (or steps) to the specified job flow.

emr.add_jobflow_step("j-123", {
  :steps => [
    {
      :action_on_failure => "TERMINATE_JOB_FLOW",
      :hadoop_jar_step => {
        :args => [
          "s3://elasticmapreduce/libs/pig/pig-script",
            "--base-path",
            "s3://elasticmapreduce/libs/pig/",
            "--install-pig"
        ],
        :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
      },
      :name => "Setup Pig"
    }
  ]
})


78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/elasticity/emr.rb', line 78

def add_jobflow_steps(jobflow_id, steps_config)
  params = {
    :operation => "AddJobFlowSteps",
    :job_flow_id => jobflow_id
  }.merge!(steps_config)
  begin
    aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params))
    yield aws_result if block_given?
  rescue RestClient::BadRequest => e
    raise ArgumentError, EMR.parse_error_response(e.http_body)
  end
end

#describe_jobflows(params = {}) {|aws_result| ... } ⇒ Object

Lists all jobflows in all states.

Yields:

  • (aws_result)


10
11
12
13
14
15
16
17
18
# File 'lib/elasticity/emr.rb', line 10

def describe_jobflows(params = {})
  aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(
    params.merge({:operation => "DescribeJobFlows"}))
  )
  xml_doc = Nokogiri::XML(aws_result)
  xml_doc.remove_namespaces!
  yield aws_result if block_given?
  JobFlow.from_members_nodeset(xml_doc.xpath("/DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member"))
end

#direct(params) ⇒ Object

Pass the specified params hash directly through to the AWS request URL. Use this if you want to perform an operation that hasn’t yet been wrapped by Elasticity or you just want to see the response XML for yourself :)



224
225
226
# File 'lib/elasticity/emr.rb', line 224

def direct(params)
  @aws_request.aws_emr_request(params)
end

#modify_instance_groups(instance_group_config) ⇒ Object

Set the number of instances in the specified instance groups to the specified counts. Note that this modifies the request count, which is not the same as the running count. I.e. you request instances and then wait for them to be created.

Takes a {} of instance group IDs => desired instance count.

{"ig-1" => 40, "ig-2" => 5, ...}


99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/elasticity/emr.rb', line 99

def modify_instance_groups(instance_group_config)
  params = {
    :operation => "ModifyInstanceGroups",
    :instance_groups => instance_group_config.map { |k, v| {:instance_group_id => k, :instance_count => v} }
  }
  begin
    aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params))
    yield aws_result if block_given?
  rescue RestClient::BadRequest => e
    raise ArgumentError, EMR.parse_error_response(e.http_body)
  end
end

#run_job_flow(job_flow_config) ⇒ Object

Start a job flow with the specified configuration. This is a very thin wrapper around the AWS API, so in order to use it directly you’ll need to have the PDF API reference handy, which can be found here:

awsdocs.s3.amazonaws.com/ElasticMapReduce/20090331/emr-api-20090331.pdf

Here is a sample job flow configuration that should help. This job flow starts by installing Pig then running a Pig script. It is based off of the Pig demo script from Amazon.

emr.run_job_flow({
  :name => "Elasticity Test Flow (EMR Pig Script)",
  :instances => {
    :ec2_key_name => "sharethrough-dev",
    :hadoop_version => "0.20",
    :instance_count => 2,
    :master_instance_type => "m1.small",
    :placement => {
      :availability_zone => "us-east-1a"
    },
    :slave_instance_type => "m1.small",
  },
  :steps => [
    {
      :action_on_failure => "TERMINATE_JOB_FLOW",
      :hadoop_jar_step => {
        :args => [
          "s3://elasticmapreduce/libs/pig/pig-script",
            "--base-path",
            "s3://elasticmapreduce/libs/pig/",
            "--install-pig"
        ],
        :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
      },
      :name => "Setup Pig"
    },
      {
        :action_on_failure => "TERMINATE_JOB_FLOW",
        :hadoop_jar_step => {
          :args => [
            "s3://elasticmapreduce/libs/pig/pig-script",
              "--run-pig-script",
              "--args",
              "-p",
              "INPUT=s3n://elasticmapreduce/samples/pig-apache/input",
              "-p",
              "OUTPUT=s3n://slif-elasticity/pig-apache/output/2011-04-19",
              "s3n://elasticmapreduce/samples/pig-apache/do-reports.pig"
          ],
          :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
        },
        :name => "Run Pig Script"
      }
  ]
})


167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/elasticity/emr.rb', line 167

def run_job_flow(job_flow_config)
  params = {
    :operation => "RunJobFlow",
  }.merge!(job_flow_config)
  begin
    aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params))
    yield aws_result if block_given?
    xml_doc = Nokogiri::XML(aws_result)
    xml_doc.remove_namespaces!
    xml_doc.xpath("/RunJobFlowResponse/RunJobFlowResult/JobFlowId").text
  rescue RestClient::BadRequest => e
    raise ArgumentError, EMR.parse_error_response(e.http_body)
  end
end

#set_termination_protection(jobflow_ids, protection_enabled = true) ⇒ Object

Enabled or disable “termination protection” on the specified job flows. Termination protection prevents a job flow from being terminated by a user initiated action, although the job flow will still terminate naturally.

Takes an [] of job flow IDs.

["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]


190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/elasticity/emr.rb', line 190

def set_termination_protection(jobflow_ids, protection_enabled=true)
  params = {
    :operation => "SetTerminationProtection",
    :termination_protected => protection_enabled,
    :job_flow_ids => jobflow_ids
  }
  begin
    aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params))
    yield aws_result if block_given?
  rescue RestClient::BadRequest => e
    raise ArgumentError, EMR.parse_error_response(e.http_body)
  end
end

#terminate_jobflows(jobflow_id) ⇒ Object

Terminate the specified jobflow. Amazon does not define a return value for this operation, so you’ll need to poll #describe_jobflows to see the state of the jobflow. Raises ArgumentError if the specified job flow does not exist.



208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/elasticity/emr.rb', line 208

def terminate_jobflows(jobflow_id)
  params = {
    :operation => "TerminateJobFlows",
    :job_flow_ids => [jobflow_id]
  }
  begin
    aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params))
    yield aws_result if block_given?
  rescue RestClient::BadRequest
    raise ArgumentError, "Job flow '#{jobflow_id}' does not exist."
  end
end