Class: Elasticity::EMR

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticity/emr.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ EMR

Returns a new instance of EMR.



7
8
9
# File 'lib/elasticity/emr.rb', line 7

def initialize(options = {})
  @aws_request = Elasticity::AwsSession.new(options)
end

Instance Attribute Details

#aws_requestObject (readonly)

Returns the value of attribute aws_request.



5
6
7
# File 'lib/elasticity/emr.rb', line 5

def aws_request
  @aws_request
end

Instance Method Details

#==(other) ⇒ Object



364
365
366
367
368
# File 'lib/elasticity/emr.rb', line 364

def ==(other)
  return false unless other.is_a? EMR
  return false unless @aws_request == other.aws_request
  true
end

#add_instance_groups(jobflow_id, instance_group_configs) {|aws_result| ... } ⇒ Object

Adds a new group of instances to the specified jobflow. Elasticity maps a more Ruby-like syntax to the Amazon options. An exhaustive hash follows although not all of these options are required (or valid!) at once. Please see the EMR docs for details although even then you’re going to need to experiment :)

instance_group_config = {
  :bid_price => 5,
  :market => "SPOT",
  :name => "Go Canucks Go!",
  :instance_count => 1,
  :instance_role => "TASK",
  :instance_type => "m1.small"
}

add_instance_groups takes an array of {}. Returns an array of the instance IDs that were created by the specified configs.

["ig-2GOVEN6HVJZID", "ig-1DU9M2UQMM051", "ig-3DZRW4Y2X4S", ...]

Yields:

  • (aws_result)


29
30
31
32
33
34
35
36
37
38
# File 'lib/elasticity/emr.rb', line 29

def add_instance_groups(jobflow_id, instance_group_configs)
  params = {
    :operation => 'AddInstanceGroups',
    :job_flow_id => jobflow_id,
    :instance_groups => instance_group_configs
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)['InstanceGroupIds']
end

#add_jobflow_steps(jobflow_id, steps_config) {|aws_result| ... } ⇒ Object

Add a step (or steps) to the specified job flow.

emr.add_jobflow_step("j-123", [
  {
    :action_on_failure => "TERMINATE_JOB_FLOW",
    :hadoop_jar_step => {
      :args => [
        "s3://elasticmapreduce/libs/pig/pig-script",
          "--base-path",
          "s3://elasticmapreduce/libs/pig/",
          "--install-pig"
      ],
      :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
    },
    :name => "Setup Pig"
  }
])

Yields:

  • (aws_result)


57
58
59
60
61
62
63
64
65
66
# File 'lib/elasticity/emr.rb', line 57

def add_jobflow_steps(jobflow_id, steps_config)
  params = {
    :operation => 'AddJobFlowSteps',
    :job_flow_id => jobflow_id,
    :steps => steps_config
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)
end

#add_tags(jobflow_id, tags) {|aws_result| ... } ⇒ Object

Sets the specified tags on all instances in the specified jobflow

emr.add_tags('j-123', [{:key => 'key1', :value => 'value1'}, {:key => 'key_only2'}])

See docs.aws.amazon.com/ElasticMapReduce/latest/API/API_AddTags.html

Yields:

  • (aws_result)


73
74
75
76
77
78
79
80
81
# File 'lib/elasticity/emr.rb', line 73

def add_tags(jobflow_id, tags)
  params = {
    :operation => 'AddTags',
    :resource_id => jobflow_id,
    :tags => tags
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end

#describe_cluster(jobflow_id) {|aws_result| ... } ⇒ Object

Provides details about the specified jobflow

emr.describe_cluster('j-123')

docs.aws.amazon.com/ElasticMapReduce/latest/API/API_DescribeCluster.html

Yields:

  • (aws_result)


88
89
90
91
92
93
94
95
96
# File 'lib/elasticity/emr.rb', line 88

def describe_cluster(jobflow_id)
  params = {
    :operation => 'DescribeCluster',
    :cluster_id => jobflow_id,
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)
end

#describe_step(jobflow_id, step_id) {|aws_result| ... } ⇒ Object

Provides details about the specified step within an existing jobflow

emr.describe_step('j-123', 'step-456')

docs.aws.amazon.com/ElasticMapReduce/latest/API/API_DescribeStep.html

Yields:

  • (aws_result)


103
104
105
106
107
108
109
110
111
112
# File 'lib/elasticity/emr.rb', line 103

def describe_step(jobflow_id, step_id)
  params = {
    :operation => 'DescribeStep',
    :cluster_id => jobflow_id,
    :step_id => step_id
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)
end

#direct(params) ⇒ Object

Pass the specified params hash directly through to the AWS request URL. Use this if you want to perform an operation that hasn’t yet been wrapped by Elasticity or you just want to see the response XML for yourself :)



360
361
362
# File 'lib/elasticity/emr.rb', line 360

def direct(params)
  @aws_request.submit(params)
end

#list_bootstrap_actions(jobflow_id) {|aws_result| ... } ⇒ Object

List the bootstrap actions in the specified jobflow

emr.list_bootstrap_actions('j-123')

docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListBootstrapActions.html

Yields:

  • (aws_result)


178
179
180
181
182
183
184
185
186
# File 'lib/elasticity/emr.rb', line 178

def list_bootstrap_actions(jobflow_id)
  params = {
    :operation => 'ListBootstrapActions',
    :cluster_id => jobflow_id,
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)
end

#list_clusters(options = {}) {|aws_result| ... } ⇒ Object

List the clusters given specified filtering

emr.list_clusters({
  :states => ['status1', 'status2', ...],
  :created_before => Time,                # Amazon times are in UTC
  :created_after => Time,                 # Amazon times are in UTC
  :marker => 'marker'                     # Retrieve from a prior call to list_clusters
})

docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListClusters.html

Yields:

  • (aws_result)


124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/elasticity/emr.rb', line 124

def list_clusters(options={})
  params = {
    :operation => 'ListClusters'
  }
  params.merge!(:cluster_states => options[:states]) if options[:states]
  params.merge!(:created_before => options[:created_before].to_i) if options[:created_before]
  params.merge!(:created_after => options[:created_after].to_i) if options[:created_after]
  params.merge!(:marker => options[:marker]) if options[:marker]
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)
end

#list_instance_groups(jobflow_id) {|aws_result| ... } ⇒ Object

List the instance groups in the specified jobflow

emr.list_instance_groups('j-123')

docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListInstanceGroups.html

Yields:

  • (aws_result)


163
164
165
166
167
168
169
170
171
# File 'lib/elasticity/emr.rb', line 163

def list_instance_groups(jobflow_id)
  params = {
    :operation => 'ListInstanceGroups',
    :cluster_id => jobflow_id,
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)
end

#list_instances(jobflow_id, options = {}) {|aws_result| ... } ⇒ Object

List the instances in a cluster given specified filtering

emr.list_instances('j-123', {
  :types => ['MASTER', 'CORE', 'TASK'],
  :marker => 'marker'                     # Retrieve from a prior call to list_clusters
})

docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListInstances.html

Yields:

  • (aws_result)


145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/elasticity/emr.rb', line 145

def list_instances(jobflow_id, options={})
  params = {
    :operation => 'ListInstances',
    :cluster_id => jobflow_id
  }
  params.merge!(:instance_group_id => options[:instance_group_id]) if options[:instance_group_id]
  params.merge!(:instance_group_types => options[:instance_group_types]) if options[:instance_group_types]
  params.merge!(:marker => options[:marker]) if options[:marker]
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)
end

#list_steps(jobflow_id, options = {}) {|aws_result| ... } ⇒ Object

List the steps in a job flow given specified filtering

emr.list_steps('j-123', {
  :types => ['MASTER', 'CORE', 'TASK'],
  :step_ids => ['ID-1', 'ID-2']
  :step_states => ['PENDING', 'RUNNING', ...]
  :marker => 'marker'                         # Retrieve from a prior call to list_steps
})

docs.aws.amazon.com/ElasticMapReduce/latest/API/API_ListSteps.html

Yields:

  • (aws_result)


198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/elasticity/emr.rb', line 198

def list_steps(jobflow_id, options={})
  params = {
    :operation => 'ListSteps',
    :cluster_id => jobflow_id,
  }
  params.merge!(:step_ids => options[:step_ids]) if options[:step_ids]
  params.merge!(:step_states => options[:step_states]) if options[:step_states]
  params.merge!(:marker => options[:marker]) if options[:marker]
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)
end

#modify_instance_groups(instance_group_config) {|aws_result| ... } ⇒ Object

Set the number of instances in the specified instance groups to the specified counts. Note that this modifies the request count, which is not the same as the running count. I.e. you request instances and then wait for them to be created.

Takes a {} of instance group IDs => desired instance count.

{"ig-1" => 40, "ig-2" => 5, ...}

Yields:

  • (aws_result)


219
220
221
222
223
224
225
226
# File 'lib/elasticity/emr.rb', line 219

def modify_instance_groups(instance_group_config)
  params = {
    :operation => 'ModifyInstanceGroups',
    :instance_groups => instance_group_config.map { |k, v| {:instance_group_id => k, :instance_count => v} }
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end

#remove_tags(jobflow_id, keys) {|aws_result| ... } ⇒ Object

Remove the specified tags on all instances in the specified jobflow

emr.remove_tags('j-123', ['key1','key_only2'])

See docs.aws.amazon.com/ElasticMapReduce/latest/API/API_RemoveTags.html

Yields:

  • (aws_result)


233
234
235
236
237
238
239
240
241
# File 'lib/elasticity/emr.rb', line 233

def remove_tags(jobflow_id, keys)
  params = {
    :operation => 'RemoveTags',
    :resource_id => jobflow_id,
    :tag_keys => keys
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end

#run_job_flow(job_flow_config) {|aws_result| ... } ⇒ Object

Start a job flow with the specified configuration. This is a very thin wrapper around the AWS API, so in order to use it directly you’ll need to have the PDF API reference handy, which can be found here:

awsdocs.s3.amazonaws.com/ElasticMapReduce/20090331/emr-api-20090331.pdf

Here is a sample job flow configuration that should help. This job flow starts by installing Pig then running a Pig script. It is based off of the Pig demo script from Amazon.

emr.run_job_flow({
  :name => "Elasticity Test Flow (EMR Pig Script)",
  :instances => {
    :ec2_key_name => "sharethrough-dev",
    :hadoop_version => "0.20",
    :instance_count => 2,
    :master_instance_type => "m1.small",
    :placement => {
      :availability_zone => "us-east-1a"
    },
    :slave_instance_type => "m1.small",
  },
  :steps => [
    {
      :action_on_failure => "TERMINATE_JOB_FLOW",
      :hadoop_jar_step => {
        :args => [
          "s3://elasticmapreduce/libs/pig/pig-script",
            "--base-path",
            "s3://elasticmapreduce/libs/pig/",
            "--install-pig"
        ],
        :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
      },
      :name => "Setup Pig"
    },
      {
        :action_on_failure => "TERMINATE_JOB_FLOW",
        :hadoop_jar_step => {
          :args => [
            "s3://elasticmapreduce/libs/pig/pig-script",
              "--run-pig-script",
              "--args",
              "-p",
              "INPUT=s3n://elasticmapreduce/samples/pig-apache/input",
              "-p",
              "OUTPUT=s3n://slif-elasticity/pig-apache/output/2011-04-19",
              "s3n://elasticmapreduce/samples/pig-apache/do-reports.pig"
          ],
          :jar => "s3://elasticmapreduce/libs/script-runner/script-runner.jar"
        },
        :name => "Run Pig Script"
      }
  ]
})

Yields:

  • (aws_result)


298
299
300
301
302
303
304
305
# File 'lib/elasticity/emr.rb', line 298

def run_job_flow(job_flow_config)
  params = {
    :operation => 'RunJobFlow',
  }.merge!(job_flow_config)
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
  JSON.parse(aws_result)['JobFlowId']
end

#set_termination_protection(jobflow_ids, protection_enabled = true) {|aws_result| ... } ⇒ Object

Enabled or disable “termination protection” on the specified job flows. Termination protection prevents a job flow from being terminated by a user initiated action, although the job flow will still terminate naturally.

Takes an [] of job flow IDs.

["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]

Yields:

  • (aws_result)


315
316
317
318
319
320
321
322
323
# File 'lib/elasticity/emr.rb', line 315

def set_termination_protection(jobflow_ids, protection_enabled=true)
  params = {
    :operation => 'SetTerminationProtection',
    :termination_protected => protection_enabled,
    :job_flow_ids => jobflow_ids
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end

#set_visible_to_all_users(jobflow_ids, visible = true) {|aws_result| ... } ⇒ Object

Whether or not all IAM users in this account can access the job flows.

Takes an [] of job flow IDs.

["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]

docs.aws.amazon.com/ElasticMapReduce/latest/API/API_SetVisibleToAllUsers.html

Yields:

  • (aws_result)


332
333
334
335
336
337
338
339
340
# File 'lib/elasticity/emr.rb', line 332

def set_visible_to_all_users(jobflow_ids, visible=true)
  params = {
    :operation => 'SetVisibleToAllUsers',
    :visible_to_all_users => visible,
    :job_flow_ids => jobflow_ids
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end

#terminate_jobflows(jobflow_ids) {|aws_result| ... } ⇒ Object

Terminate the specified jobflows. Amazon does not define a return value for this operation, so you’ll need to poll to see the state of the jobflow.

Takes an [] of job flow IDs.

["j-1B4D1XP0C0A35", "j-1YG2MYL0HVYS5", ...]

Yields:

  • (aws_result)


348
349
350
351
352
353
354
355
# File 'lib/elasticity/emr.rb', line 348

def terminate_jobflows(jobflow_ids)
  params = {
    :operation => 'TerminateJobFlows',
    :job_flow_ids => jobflow_ids
  }
  aws_result = @aws_request.submit(params)
  yield aws_result if block_given?
end