Class: Elasticity::EMR
- Inherits:
-
Object
- Object
- Elasticity::EMR
- Defined in:
- lib/elasticity/emr.rb
Class Method Summary collapse
-
.camelize(lower_case_and_underscored_word, first_letter_in_uppercase = true) ⇒ Object
(Used from Rails’ ActiveSupport).
-
.convert_ruby_to_aws(params) ⇒ Object
Since we use the same structure as AWS, we can generate AWS param names from the Ruby versions of those names (and the param nesting).
-
.parse_error_response(error_xml) ⇒ Object
AWS error responses all follow the same form.
Instance Method Summary collapse
-
#add_instance_groups(jobflow_id, instance_group_configs) ⇒ Object
Adds a new group of instances to the specified jobflow.
-
#add_jobflow_steps(jobflow_id, steps_config) ⇒ Object
Add a step (or steps) to the specified job flow.
-
#describe_jobflows(params = {}) {|aws_result| ... } ⇒ Object
Lists all jobflows in all states.
-
#direct(params) ⇒ Object
Pass the specified params hash directly through to the AWS request URL.
-
#initialize(aws_access_key_id, aws_secret_access_key, options = {}) ⇒ EMR
constructor
A new instance of EMR.
-
#modify_instance_groups(instance_group_config) ⇒ Object
Set the number of instances in the specified instance groups to the specified counts.
-
#run_job_flow(job_flow_config) ⇒ Object
Start a job flow with the specified configuration.
-
#set_termination_protection(jobflow_ids, protection_enabled = true) ⇒ Object
Enabled or disable “termination protection” on the specified job flows.
-
#terminate_jobflows(jobflow_id) ⇒ Object
Terminate the specified jobflow.
Constructor Details
#initialize(aws_access_key_id, aws_secret_access_key, options = {}) ⇒ EMR
Returns a new instance of EMR.
5 6 7 |
# File 'lib/elasticity/emr.rb', line 5 def initialize(aws_access_key_id, aws_secret_access_key, = {}) @aws_request = Elasticity::AwsRequest.new(aws_access_key_id, aws_secret_access_key, ) end |
Class Method Details
.camelize(lower_case_and_underscored_word, first_letter_in_uppercase = true) ⇒ Object
(Used from Rails’ ActiveSupport)
270 271 272 273 274 275 276 |
# File 'lib/elasticity/emr.rb', line 270 def camelize(lower_case_and_underscored_word, first_letter_in_uppercase = true) if first_letter_in_uppercase lower_case_and_underscored_word.to_s.gsub(/\/(.?)/) { "::" + $1.upcase }.gsub(/(^|_)(.)/) { $2.upcase } else lower_case_and_underscored_word.first + camelize(lower_case_and_underscored_word)[1..-1] end end |
.convert_ruby_to_aws(params) ⇒ Object
Since we use the same structure as AWS, we can generate AWS param names from the Ruby versions of those names (and the param nesting).
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
# File 'lib/elasticity/emr.rb', line 242 def convert_ruby_to_aws(params) result = {} params.each do |key, value| case value when Array prefix = "#{camelize(key.to_s)}.member" value.each_with_index do |item, index| if item.is_a?(String) result["#{prefix}.#{index+1}"] = item else convert_ruby_to_aws(item).each do |nested_key, nested_value| result["#{prefix}.#{index+1}.#{nested_key}"] = nested_value end end end when Hash prefix = "#{camelize(key.to_s)}" convert_ruby_to_aws(value).each do |nested_key, nested_value| result["#{prefix}.#{nested_key}"] = nested_value end else result[camelize(key.to_s)] = value end end result end |
.parse_error_response(error_xml) ⇒ Object
AWS error responses all follow the same form. Extract the message from the error document.
234 235 236 237 238 |
# File 'lib/elasticity/emr.rb', line 234 def parse_error_response(error_xml) xml_doc = Nokogiri::XML(error_xml) xml_doc.remove_namespaces! xml_doc.xpath("/ErrorResponse/Error/Message").text end |
Instance Method Details
#add_instance_groups(jobflow_id, instance_group_configs) ⇒ Object
Adds a new group of instances to the specified jobflow. Elasticity maps a more Ruby-like syntax to the Amazon options. An exhaustive hash follows although not all of these options are required (or valid!) at once. Please see the EMR docs for details although even then you’re going to need to experiment :)
instance_group_config = {
:bid_price => 5,
:instance_count => 1,
:instance_role => "TASK",
:market => "SPOT",
:name => "Go Canucks Go!"
:type => "m1.small",
}
add_instance_groups takes an array of {}. Returns an array of the instance IDs that were created by the specified configs.
["ig-2GOVEN6HVJZID", "ig-1DU9M2UQMM051", "ig-3DZRW4Y2X4S", ...]
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/elasticity/emr.rb', line 38 def add_instance_groups(jobflow_id, instance_group_configs) params = { :operation => "AddInstanceGroups", :job_flow_id => jobflow_id, :instance_groups => instance_group_configs } begin aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params)) xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! instance_group_ids = [] xml_doc.xpath("/AddInstanceGroupsResponse/AddInstanceGroupsResult/InstanceGroupIds/member").each do |member| instance_group_ids << member.text end yield aws_result if block_given? instance_group_ids rescue RestClient::BadRequest => e raise ArgumentError, EMR.parse_error_response(e.http_body) end end |
#add_jobflow_steps(jobflow_id, steps_config) ⇒ Object
Add a step (or steps) to the specified job flow.
emr.add_jobflow_step("j-123", {
:steps => [
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--base-path",
"s3://elasticmapreduce/libs/pig/",
"--install-pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Setup Pig"
}
]
})
78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/elasticity/emr.rb', line 78 def add_jobflow_steps(jobflow_id, steps_config) params = { :operation => "AddJobFlowSteps", :job_flow_id => jobflow_id }.merge!(steps_config) begin aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params)) yield aws_result if block_given? rescue RestClient::BadRequest => e raise ArgumentError, EMR.parse_error_response(e.http_body) end end |
#describe_jobflows(params = {}) {|aws_result| ... } ⇒ Object
Lists all jobflows in all states.
10 11 12 13 14 15 16 17 18 |
# File 'lib/elasticity/emr.rb', line 10 def describe_jobflows(params = {}) aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws( params.merge({:operation => "DescribeJobFlows"})) ) xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! yield aws_result if block_given? JobFlow.from_members_nodeset(xml_doc.xpath("/DescribeJobFlowsResponse/DescribeJobFlowsResult/JobFlows/member")) end |
#direct(params) ⇒ Object
Pass the specified params hash directly through to the AWS request URL. Use this if you want to perform an operation that hasn’t yet been wrapped by Elasticity or you just want to see the response XML for yourself :)
224 225 226 |
# File 'lib/elasticity/emr.rb', line 224 def direct(params) @aws_request.aws_emr_request(params) end |
#modify_instance_groups(instance_group_config) ⇒ Object
Set the number of instances in the specified instance groups to the specified counts. Note that this modifies the request count, which is not the same as the running count. I.e. you request instances and then wait for them to be created.
Takes a {} of instance group IDs => desired instance count.
{"ig-1" => 40, "ig-2" => 5, ...}
99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/elasticity/emr.rb', line 99 def modify_instance_groups(instance_group_config) params = { :operation => "ModifyInstanceGroups", :instance_groups => instance_group_config.map { |k, v| {:instance_group_id => k, :instance_count => v} } } begin aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params)) yield aws_result if block_given? rescue RestClient::BadRequest => e raise ArgumentError, EMR.parse_error_response(e.http_body) end end |
#run_job_flow(job_flow_config) ⇒ Object
Start a job flow with the specified configuration. This is a very thin wrapper around the AWS API, so in order to use it directly you’ll need to have the PDF API reference handy, which can be found here:
awsdocs.s3.amazonaws.com/ElasticMapReduce/20090331/emr-api-20090331.pdf
Here is a sample job flow configuration that should help. This job flow starts by installing Pig then running a Pig script. It is based off of the Pig demo script from Amazon.
emr.run_job_flow({
:name => "Elasticity Test Flow (EMR Pig Script)",
:instances => {
:ec2_key_name => "sharethrough-dev",
:hadoop_version => "0.20",
:instance_count => 2,
:master_instance_type => "m1.small",
:placement => {
:availability_zone => "us-east-1a"
},
:slave_instance_type => "m1.small",
},
:steps => [
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--base-path",
"s3://elasticmapreduce/libs/pig/",
"--install-pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Setup Pig"
},
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--run-pig-script",
"--args",
"-p",
"INPUT=s3n://elasticmapreduce/samples/pig-apache/input",
"-p",
"OUTPUT=s3n://slif-elasticity/pig-apache/output/2011-04-19",
"s3n://elasticmapreduce/samples/pig-apache/do-reports.pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Run Pig Script"
}
]
})
167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/elasticity/emr.rb', line 167 def run_job_flow(job_flow_config) params = { :operation => "RunJobFlow", }.merge!(job_flow_config) begin aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params)) yield aws_result if block_given? xml_doc = Nokogiri::XML(aws_result) xml_doc.remove_namespaces! xml_doc.xpath("/RunJobFlowResponse/RunJobFlowResult/JobFlowId").text rescue RestClient::BadRequest => e raise ArgumentError, EMR.parse_error_response(e.http_body) end end |
#set_termination_protection(jobflow_ids, protection_enabled = true) ⇒ Object
Enabled or disable “termination protection” on the specified job flows. Termination protection prevents a job flow from being terminated by a user initiated action, although the job flow will still terminate naturally.
Takes an [] of job flow IDs.
["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]
190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/elasticity/emr.rb', line 190 def set_termination_protection(jobflow_ids, protection_enabled=true) params = { :operation => "SetTerminationProtection", :termination_protected => protection_enabled, :job_flow_ids => jobflow_ids } begin aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params)) yield aws_result if block_given? rescue RestClient::BadRequest => e raise ArgumentError, EMR.parse_error_response(e.http_body) end end |
#terminate_jobflows(jobflow_id) ⇒ Object
Terminate the specified jobflow. Amazon does not define a return value for this operation, so you’ll need to poll #describe_jobflows to see the state of the jobflow. Raises ArgumentError if the specified job flow does not exist.
208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/elasticity/emr.rb', line 208 def terminate_jobflows(jobflow_id) params = { :operation => "TerminateJobFlows", :job_flow_ids => [jobflow_id] } begin aws_result = @aws_request.aws_emr_request(EMR.convert_ruby_to_aws(params)) yield aws_result if block_given? rescue RestClient::BadRequest raise ArgumentError, "Job flow '#{jobflow_id}' does not exist." end end |