Class: Awscli::Emr::EMR

Inherits:
Object
  • Object
show all
Defined in:
lib/awscli/emr.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ EMR

Returns a new instance of EMR.



4
5
6
# File 'lib/awscli/emr.rb', line 4

def initialize(connection)
  @conn = connection
end

Instance Method Details

#add_instance_group(options) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/awscli/emr.rb', line 145

def add_instance_group(options)
  opts = Marshal.load(Marshal.dump(options))
  opts.reject! { |key| key == 'job_flow_id' }
  opts.reject! { |key| key == 'region' }
  abort 'invalid job id' unless @conn.describe_job_flows.body['JobFlows'].map { |job| job['JobFlowId'] }.include?(options[:job_flow_id])
  abort 'invalid instance type' unless Awscli::Instances::INSTANCE_SIZES.include?(options[:instance_type])
  if instance_count = opts.delete(:instance_count)
    opts.merge!('InstanceCount' => instance_count)
  end
  if instance_type = opts.delete(:instance_type)
    opts.merge!('InstanceType' => instance_type)
  end
  if instance_role = opts.delete(:instance_role)
    opts.merge!('InstanceRole' => instance_role)
  end
  if name = opts.delete(:name)
    opts.merge!('Name' => name)
  end
  if bid_price = opts.delete(:bid_price)
    opts.merge!('BidPrice' => bid_price)
    opts.merge!('MarketType' => 'SPOT')
  else
    opts.merge!('MarketType' => 'ON_DEMAND')
  end
  (instance_groups ||= []) << opts
  @conn.add_instance_groups(options[:job_flow_id], 'InstanceGroups' => instance_groups)
  puts "Added instance group to job flow(with id): #{options[:job_flow_id]}"
end

#add_instance_groups(job_flow_id, groups) ⇒ Object



207
208
209
210
211
# File 'lib/awscli/emr.rb', line 207

def add_instance_groups(job_flow_id, groups)
  validate_job_ids job_flow_id
  instance_groups = parse_instance_groups(groups)
  @conn.add_instance_groups(job_flow_id, 'InstanceGroups' => instance_groups)
end

#add_steps(job_flow_id, job_steps) ⇒ Object



174
175
176
177
178
# File 'lib/awscli/emr.rb', line 174

def add_steps(job_flow_id, job_steps)
  validate_job_ids job_flow_id
  @conn.add_job_flow_steps(job_flow_id, 'Steps' => parse_custom_jar(job_steps))
  puts "Added step to job flow id: #{job_flow_id}"
end

#create_job_flow(options) ⇒ Object



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/awscli/emr.rb', line 41

def create_job_flow(options)
  # => BOOTSTRAP ACTIONS
  boot_strap_actions = []
  if options[:bootstrap_actions]
    options[:bootstrap_actions].each do |step|
      boot_strap_actions << parse_boot_strap_actions(step)
    end
  end

  # => STEPS
  steps = []
  if options[:custom_jar_steps]
    options[:custom_jar_steps].each do |step|
      steps << parse_custom_jar(step)
    end
  end
  if options[:hive_interactive]
    steps << hive_install(options[:hadoop_version])
  end
  if options[:pig_interactive]
    steps << pig_install
  end
  if options[:hive_steps]
    steps << hive_install(options[:hadoop_version]) unless options[:hive_interactive]
    options[:hive_steps].each do |step|
      steps << parse_hive_steps(step)
    end
  end
  if options[:pig_steps]
    steps << pig_install unless options[:pig_interactive]
    options[:pig_steps].each do |step|
      steps << parse_pig_steps(step)
    end
  end
  if options[:streaming_steps]
    options[:streaming_steps].each do |step|
      steps << parse_streaming_steps(step)
    end
  end
  if options[:hbase_install]
    boot_strap_actions << hbase_install_boot_strap
    steps << hbase_install_steps
    #validate hadoop version and instance size
    abort "Invalid hadoop version #{options[:hadoop_version]}, supported Hadoop Versions for HBase are: #{Awscli::EMR::HBASE_SUPPORTED_HADOOP.join(',')}" unless Awscli::EMR::HBASE_SUPPORTED_HADOOP.include?(options[:hadoop_version])
    options[:instance_groups] && parse_instance_groups(options[:instance_groups]).each do |group|
      unless is_valid_instance_type?(group['InstanceType'])
        abort "Instance type #{group['InstanceType']} is not compatible with HBase, instance size should be equal or greater than m1.large"
      end
    end
    if options[:master_instance_type]
      unless is_valid_instance_type?(options[:master_instance_type])
        abort "Instance type #{options[:master_instance_type]} is not compatible with HBase, instance size should be equal or greater than m1.large"
      end
    end
    if options[:slave_instance_type]
      unless is_valid_instance_type?(options[:slave_instance_type])
        abort "Instance type #{options[:slave_instance_type]} is not compatible with HBase, instance size should be equal or greater than m1.large"
      end
    end
    # => HBase backups
    if options[:hbase_backup_schedule]
      # Backup
      if options[:hbase_consistent_backup]
        steps << parse_hbase_backup(options[:hbase_backup_schedule], true)
      else
        steps << parse_hbase_backup(options[:hbase_backup_schedule])
      end
    elsif options[:hbase_backup_restore]
      # Restore
      steps << parse_hbase_restore(options[:hbase_backup_restore])
    end
  end

  # => INSTANCES
  instances = Hash.new
  instances['HadoopVersion'] = options[:hadoop_version]
  if options[:hive_interactive] or options[:pig_interactive] or options[:hbase_install]  #then job flow should not be terminated
    instances['KeepJobFlowAliveWhenNoSteps'] = true
  else
    instances['KeepJobFlowAliveWhenNoSteps'] = options[:alive]
  end
  instances['Ec2KeyName'] = options[:instance_ec2_key_name] if options[:instance_ec2_key_name]
  instances['InstanceCount'] = options[:instance_count] if options[:instance_count]
  instances['MasterInstanceType'] = options[:master_instance_type] if options[:master_instance_type]
  instances['SlaveInstanceType'] = options[:slave_instance_type] if options[:slave_instance_type]
  instances['TerminationProtected'] = options[:termination_protection] if options[:termination_protection]
  # => Instance Groups
  instances['InstanceGroups'] = parse_instance_groups(options[:instance_groups]) if options[:instance_groups]

  # => Build final request
  job_flow = Hash.new
  job_flow['AmiVersion'] = Awscli::EMR::HADOOP_AMI_MAPPING[options[:hadoop_version]]
  job_flow['LogUri'] = options[:log_uri] if options[:log_uri]
  job_flow['BootstrapActions'] = boot_strap_actions if options[:bootstrap_actions] or options[:hbase_install]
  job_flow['Instances'] = instances
  job_flow['Steps'] = steps
  if options[:alive] or options[:hive_interactive] or options[:pig_interactive] or options[:hbase_install]
    @conn.run_job_flow("#{options[:name]} (requires manual termination)", job_flow)
  else
    @conn.run_job_flow(options[:name], job_flow)
  end
  puts "Create JobFlow '#{options[:name]}' Successfully!"
end

#delete(job_ids) ⇒ Object



213
214
215
216
217
# File 'lib/awscli/emr.rb', line 213

def delete(job_ids)
  validate_job_ids job_ids
  @conn.terminate_job_flows('JobFlowIds' => job_ids)
  puts "Terminated Job Flows: #{job_ids.join(',')}"
end

#list(options) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/awscli/emr.rb', line 8

def list(options)
  validate_job_ids options[:job_flow_ids] if options[:job_flow_ids]
  opts = Marshal.load(Marshal.dump(options))
  opts.reject! { |k| k == 'table' } if options[:table]
  if job_flow_ids = opts.delete(:job_flow_ids)
    opts.merge!('JobFlowIds' => job_flow_ids)
  end
  if job_flow_status = opts.delete(:job_flow_status)
    opts.merge!('JobFlowStates' => job_flow_status)
  end
  if options[:table]
    puts 'For detailed information, dont pass --table option'
    job_flows = @conn.describe_job_flows(opts).body['JobFlows']
    table_data = Array.new
    unless job_flows.empty?
      job_flows.each do |job_flow|
        table_data << {
                        :job_flow_id => job_flow['JobFlowId'],
                        :name => job_flow['Name'],
                        :instance_count => job_flow['Instances']['InstanceCount'],
                        :master_dns => job_flow['Instances']['MasterPublicDnsName'],
                        :ec2_key_name => job_flow['Instances']['Ec2KeyName'],
                        :state => job_flow['ExecutionStatusDetail']['State']
                      }
      end
    end
    Formatador.display_table(table_data, [:job_flow_id, :name, :state, :instance_count, :master_dns, :ec2_key_name])
  else
    puts 'For less information, pass --table option'
    puts @conn.describe_job_flows(opts).body['JobFlows'].to_yaml
  end
end

#modify_instance_group(options) ⇒ Object



180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/awscli/emr.rb', line 180

def modify_instance_group(options)
  abort "Invalid instance group id: #{options[:instance_group_id]}" unless validate_instance_group_id?(options[:instance_group_id])
  @conn.modify_instance_groups(
      'InstanceGroups' => [
        'InstanceCount' => options[:instance_count],
        'InstanceGroupId' => options[:instance_group_id]
      ]
  )
rescue Excon::Errors::BadRequest
  puts "[Error]: #{$!}"
else
  puts "Modified instance group #{options[:instance_group_id]} size to #{options[:instance_count]}"
end

#set_termination_protection(job_flow_ids, terminate_protection) ⇒ Object



194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/awscli/emr.rb', line 194

def set_termination_protection(job_flow_ids, terminate_protection)
  validate_job_ids job_flow_ids
  @conn.set_termination_protection(
      terminate_protection,
      {
          'JobFlowIds' => job_flow_ids
      }
  )
  terminate_protection ?
    puts("Termination protection flag added to job_flows: #{job_flow_ids.join(',')}") :
    puts("Termination protection flag removed from job_flows: #{job_flow_ids.join(',')}")
end