Class: Elasticity::EMR
- Inherits:
-
Object
- Object
- Elasticity::EMR
- Defined in:
- lib/elasticity/emr.rb
Instance Attribute Summary collapse
-
#aws_request ⇒ Object
readonly
Returns the value of attribute aws_request.
Instance Method Summary collapse
- #==(other) ⇒ Object
-
#add_instance_groups(jobflow_id, instance_group_configs) {|aws_result| ... } ⇒ Object
Adds a new group of instances to the specified jobflow.
-
#add_jobflow_steps(jobflow_id, steps_config) {|aws_result| ... } ⇒ Object
Add a step (or steps) to the specified job flow.
-
#add_tags(jobflow_id, tags) {|aws_result| ... } ⇒ Object
Sets the specified tags on all instances in the specified jobflow.
-
#describe_cluster(jobflow_id) {|aws_result| ... } ⇒ Object
Provides details about the specified jobflow.
-
#describe_step(jobflow_id, step_id) {|aws_result| ... } ⇒ Object
Provides details about the specified step within an existing jobflow.
-
#direct(params) ⇒ Object
Pass the specified params hash directly through to the AWS request URL.
-
#initialize(options = {}) ⇒ EMR
constructor
A new instance of EMR.
-
#list_bootstrap_actions(jobflow_id) {|aws_result| ... } ⇒ Object
List the bootstrap actions in the specified jobflow.
-
#list_clusters(options = {}) {|aws_result| ... } ⇒ Object
List the clusters given specified filtering.
-
#list_instance_groups(jobflow_id) {|aws_result| ... } ⇒ Object
List the instance groups in the specified jobflow.
-
#list_instances(jobflow_id, options = {}) {|aws_result| ... } ⇒ Object
List the instances in a cluster given specified filtering.
-
#list_steps(jobflow_id, options = {}) {|aws_result| ... } ⇒ Object
List the steps in a job flow given specified filtering.
-
#modify_instance_groups(instance_group_config) {|aws_result| ... } ⇒ Object
Set the number of instances in the specified instance groups to the specified counts.
-
#remove_tags(jobflow_id, keys) {|aws_result| ... } ⇒ Object
Remove the specified tags on all instances in the specified jobflow.
-
#run_job_flow(job_flow_config) {|aws_result| ... } ⇒ Object
Start a job flow with the specified configuration.
-
#set_termination_protection(jobflow_ids, protection_enabled = true) {|aws_result| ... } ⇒ Object
Enabled or disable “termination protection” on the specified job flows.
-
#set_visible_to_all_users(jobflow_ids, visible = true) {|aws_result| ... } ⇒ Object
Whether or not all IAM users in this account can access the job flows.
-
#terminate_jobflows(jobflow_ids) {|aws_result| ... } ⇒ Object
Terminate the specified jobflows.
Constructor Details
#initialize(options = {}) ⇒ EMR
Returns a new instance of EMR.
7 8 9 |
# File 'lib/elasticity/emr.rb', line 7 def initialize( = {}) @aws_request = Elasticity::AwsSession.new() end |
Instance Attribute Details
#aws_request ⇒ Object (readonly)
Returns the value of attribute aws_request.
5 6 7 |
# File 'lib/elasticity/emr.rb', line 5 def aws_request @aws_request end |
Instance Method Details
#==(other) ⇒ Object
364 365 366 367 368 |
# File 'lib/elasticity/emr.rb', line 364 def ==(other) return false unless other.is_a? EMR return false unless @aws_request == other.aws_request true end |
#add_instance_groups(jobflow_id, instance_group_configs) {|aws_result| ... } ⇒ Object
Adds a new group of instances to the specified jobflow. Elasticity maps a more Ruby-like syntax to the Amazon options. An exhaustive hash follows although not all of these options are required (or valid!) at once. Please see the EMR docs for details although even then you’re going to need to experiment :)
instance_group_config = {
:bid_price => 5,
:market => "SPOT",
:name => "Go Canucks Go!",
:instance_count => 1,
:instance_role => "TASK",
:instance_type => "m1.small"
}
add_instance_groups takes an array of {}. Returns an array of the instance IDs that were created by the specified configs.
["ig-2GOVEN6HVJZID", "ig-1DU9M2UQMM051", "ig-3DZRW4Y2X4S", ...]
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/elasticity/emr.rb', line 29 def add_instance_groups(jobflow_id, instance_group_configs) params = { :operation => 'AddInstanceGroups', :job_flow_id => jobflow_id, :instance_groups => instance_group_configs } aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result)['InstanceGroupIds'] end |
#add_jobflow_steps(jobflow_id, steps_config) {|aws_result| ... } ⇒ Object
Add a step (or steps) to the specified job flow.
emr.add_jobflow_step("j-123", [
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--base-path",
"s3://elasticmapreduce/libs/pig/",
"--install-pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Setup Pig"
}
])
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/elasticity/emr.rb', line 57 def add_jobflow_steps(jobflow_id, steps_config) params = { :operation => 'AddJobFlowSteps', :job_flow_id => jobflow_id, :steps => steps_config } aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result) end |
#add_tags(jobflow_id, tags) {|aws_result| ... } ⇒ Object
Sets the specified tags on all instances in the specified jobflow
emr.('j-123', [{:key => 'key1', :value => 'value1'}, {:key => 'key_only2'}])
See docs.aws.amazon.com/ElasticMapReduce/latest/API/API_AddTags.html
73 74 75 76 77 78 79 80 81 |
# File 'lib/elasticity/emr.rb', line 73 def (jobflow_id, ) params = { :operation => 'AddTags', :resource_id => jobflow_id, :tags => } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#describe_cluster(jobflow_id) {|aws_result| ... } ⇒ Object
Provides details about the specified jobflow
emr.describe_cluster('j-123')
docs.aws.amazon.com/ElasticMapReduce/latest/API/API_DescribeCluster.html
88 89 90 91 92 93 94 95 96 |
# File 'lib/elasticity/emr.rb', line 88 def describe_cluster(jobflow_id) params = { :operation => 'DescribeCluster', :cluster_id => jobflow_id, } aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result) end |
#describe_step(jobflow_id, step_id) {|aws_result| ... } ⇒ Object
Provides details about the specified step within an existing jobflow
emr.describe_step('j-123', 'step-456')
docs.aws.amazon.com/ElasticMapReduce/latest/API/API_DescribeStep.html
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/elasticity/emr.rb', line 103 def describe_step(jobflow_id, step_id) params = { :operation => 'DescribeStep', :cluster_id => jobflow_id, :step_id => step_id } aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result) end |
#direct(params) ⇒ Object
Pass the specified params hash directly through to the AWS request URL. Use this if you want to perform an operation that hasn’t yet been wrapped by Elasticity or you just want to see the response XML for yourself :)
360 361 362 |
# File 'lib/elasticity/emr.rb', line 360 def direct(params) @aws_request.submit(params) end |
#list_bootstrap_actions(jobflow_id) {|aws_result| ... } ⇒ Object
List the bootstrap actions in the specified jobflow
emr.list_bootstrap_actions('j-123')
docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListBootstrapActions.html
178 179 180 181 182 183 184 185 186 |
# File 'lib/elasticity/emr.rb', line 178 def list_bootstrap_actions(jobflow_id) params = { :operation => 'ListBootstrapActions', :cluster_id => jobflow_id, } aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result) end |
#list_clusters(options = {}) {|aws_result| ... } ⇒ Object
List the clusters given specified filtering
emr.list_clusters({
:states => ['status1', 'status2', ...],
:created_before => Time, # Amazon times are in UTC
:created_after => Time, # Amazon times are in UTC
:marker => 'marker' # Retrieve from a prior call to list_clusters
})
docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListClusters.html
124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/elasticity/emr.rb', line 124 def list_clusters(={}) params = { :operation => 'ListClusters' } params.merge!(:cluster_states => [:states]) if [:states] params.merge!(:created_before => [:created_before].to_i) if [:created_before] params.merge!(:created_after => [:created_after].to_i) if [:created_after] params.merge!(:marker => [:marker]) if [:marker] aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result) end |
#list_instance_groups(jobflow_id) {|aws_result| ... } ⇒ Object
List the instance groups in the specified jobflow
emr.list_instance_groups('j-123')
docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListInstanceGroups.html
163 164 165 166 167 168 169 170 171 |
# File 'lib/elasticity/emr.rb', line 163 def list_instance_groups(jobflow_id) params = { :operation => 'ListInstanceGroups', :cluster_id => jobflow_id, } aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result) end |
#list_instances(jobflow_id, options = {}) {|aws_result| ... } ⇒ Object
List the instances in a cluster given specified filtering
emr.list_instances('j-123', {
:types => ['MASTER', 'CORE', 'TASK'],
:marker => 'marker' # Retrieve from a prior call to list_clusters
})
docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListInstances.html
145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/elasticity/emr.rb', line 145 def list_instances(jobflow_id, ={}) params = { :operation => 'ListInstances', :cluster_id => jobflow_id } params.merge!(:instance_group_id => [:instance_group_id]) if [:instance_group_id] params.merge!(:instance_group_types => [:instance_group_types]) if [:instance_group_types] params.merge!(:marker => [:marker]) if [:marker] aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result) end |
#list_steps(jobflow_id, options = {}) {|aws_result| ... } ⇒ Object
List the steps in a job flow given specified filtering
emr.list_steps('j-123', {
:types => ['MASTER', 'CORE', 'TASK'],
:step_ids => ['ID-1', 'ID-2']
:step_states => ['PENDING', 'RUNNING', ...]
:marker => 'marker' # Retrieve from a prior call to list_steps
})
docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListSteps.html
198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/elasticity/emr.rb', line 198 def list_steps(jobflow_id, ={}) params = { :operation => 'ListSteps', :cluster_id => jobflow_id, } params.merge!(:step_ids => [:step_ids]) if [:step_ids] params.merge!(:step_states => [:step_states]) if [:step_states] params.merge!(:marker => [:marker]) if [:marker] aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result) end |
#modify_instance_groups(instance_group_config) {|aws_result| ... } ⇒ Object
Set the number of instances in the specified instance groups to the specified counts. Note that this modifies the request count, which is not the same as the running count. I.e. you request instances and then wait for them to be created.
Takes a {} of instance group IDs => desired instance count.
{"ig-1" => 40, "ig-2" => 5, ...}
219 220 221 222 223 224 225 226 |
# File 'lib/elasticity/emr.rb', line 219 def modify_instance_groups(instance_group_config) params = { :operation => 'ModifyInstanceGroups', :instance_groups => instance_group_config.map { |k, v| {:instance_group_id => k, :instance_count => v} } } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#remove_tags(jobflow_id, keys) {|aws_result| ... } ⇒ Object
Remove the specified tags on all instances in the specified jobflow
emr.('j-123', ['key1','key_only2'])
See docs.aws.amazon.com/ElasticMapReduce/latest/API/API_RemoveTags.html
233 234 235 236 237 238 239 240 241 |
# File 'lib/elasticity/emr.rb', line 233 def (jobflow_id, keys) params = { :operation => 'RemoveTags', :resource_id => jobflow_id, :tag_keys => keys } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#run_job_flow(job_flow_config) {|aws_result| ... } ⇒ Object
Start a job flow with the specified configuration. This is a very thin wrapper around the AWS API, so in order to use it directly you’ll need to have the PDF API reference handy, which can be found here:
awsdocs.s3.amazonaws.com/ElasticMapReduce/20090331/emr-api-20090331.pdf
Here is a sample job flow configuration that should help. This job flow starts by installing Pig then running a Pig script. It is based off of the Pig demo script from Amazon.
emr.run_job_flow({
:name => "Elasticity Test Flow (EMR Pig Script)",
:instances => {
:ec2_key_name => "sharethrough-dev",
:hadoop_version => "0.20",
:instance_count => 2,
:master_instance_type => "m1.small",
:placement => {
:availability_zone => "us-east-1a"
},
:slave_instance_type => "m1.small",
},
:steps => [
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--base-path",
"s3://elasticmapreduce/libs/pig/",
"--install-pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Setup Pig"
},
{
:action_on_failure => "TERMINATE_JOB_FLOW",
:hadoop_jar_step => {
:args => [
"s3://elasticmapreduce/libs/pig/pig-script",
"--run-pig-script",
"--args",
"-p",
"INPUT=s3n://elasticmapreduce/samples/pig-apache/input",
"-p",
"OUTPUT=s3n://slif-elasticity/pig-apache/output/2011-04-19",
"s3n://elasticmapreduce/samples/pig-apache/do-reports.pig"
],
:jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
},
:name => "Run Pig Script"
}
]
})
298 299 300 301 302 303 304 305 |
# File 'lib/elasticity/emr.rb', line 298 def run_job_flow(job_flow_config) params = { :operation => 'RunJobFlow', }.merge!(job_flow_config) aws_result = @aws_request.submit(params) yield aws_result if block_given? JSON.parse(aws_result)['JobFlowId'] end |
#set_termination_protection(jobflow_ids, protection_enabled = true) {|aws_result| ... } ⇒ Object
Enabled or disable “termination protection” on the specified job flows. Termination protection prevents a job flow from being terminated by a user initiated action, although the job flow will still terminate naturally.
Takes an [] of job flow IDs.
["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]
315 316 317 318 319 320 321 322 323 |
# File 'lib/elasticity/emr.rb', line 315 def set_termination_protection(jobflow_ids, protection_enabled=true) params = { :operation => 'SetTerminationProtection', :termination_protected => protection_enabled, :job_flow_ids => jobflow_ids } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#set_visible_to_all_users(jobflow_ids, visible = true) {|aws_result| ... } ⇒ Object
Whether or not all IAM users in this account can access the job flows.
Takes an [] of job flow IDs.
["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]
docs.aws.amazon.com/ElasticMapReduce/latest/API/API_SetVisibleToAllUsers.html
332 333 334 335 336 337 338 339 340 |
# File 'lib/elasticity/emr.rb', line 332 def set_visible_to_all_users(jobflow_ids, visible=true) params = { :operation => 'SetVisibleToAllUsers', :visible_to_all_users => visible, :job_flow_ids => jobflow_ids } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |
#terminate_jobflows(jobflow_ids) {|aws_result| ... } ⇒ Object
Terminate the specified jobflows. Amazon does not define a return value for this operation, so you’ll need to poll to see the state of the jobflow.
Takes an [] of job flow IDs.
["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]
348 349 350 351 352 353 354 355 |
# File 'lib/elasticity/emr.rb', line 348 def terminate_jobflows(jobflow_ids) params = { :operation => 'TerminateJobFlows', :job_flow_ids => jobflow_ids } aws_result = @aws_request.submit(params) yield aws_result if block_given? end |