Class: Amazon

Inherits:
Infrastructure show all
Defined in:
lib/cluster/infrastructures/amazon.rb

Constant Summary collapse

EMPTY_ARRAY_ELEMENT =
'sdb_empty_array_element_&:&#!-'

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Infrastructure

cluster_name, connect, #credentials, #credentials?, current, dns, in_cluster?, #initialize, #machines, #names, #services, sizes, #to_credentials

Methods included from Cluster::Logging

#logger

Constructor Details

This class inherits a constructor from Infrastructure

Class Method Details

.from_sdb_results(res) ⇒ Object



610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
# File 'lib/cluster/infrastructures/amazon.rb', line 610

def from_sdb_results(res)
  res[:items].map {|obj|
    aws_id = obj.keys.first
    # Short cut version of HashWithIndifferentAccess
    hash = {'aws_id' => aws_id}
    hash.default_proc = proc do |h,k|
      case k
      when String
        sym = k.to_sym
        h[sym] if h.key?(sym)
      else
        str = k.to_s
        h[str] if h.key?(str)
      end
    end

    obj[aws_id].each do |k,v|
      v = if v.empty?
            nil
          elsif v.length == 1
            ff = v.first
            ff.eql?(self::EMPTY_ARRAY_ELEMENT) ? [] : ff
          else
            v.reject {|e| e.eql?(self::EMPTY_ARRAY_ELEMENT) }
          end

      hash.merge! k => v
    end

    hash
  }
end

.to_sdb_attributes(args) ⇒ Object



585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
# File 'lib/cluster/infrastructures/amazon.rb', line 585

def to_sdb_attributes(args)
  attrs = {}
  args.each do |k, v|
    next unless v

    v = if Array === v and v.empty?
          # We store an empty array as an array with the single specialized element
          [self::EMPTY_ARRAY_ELEMENT]

        elsif Time === v
          # FIXME we probably should have some extra indicator of a 
          # time field so that we can unmarshal it in the #from_sdb_results
          [v.utc.httpdate]

        elsif Date === v
          [v.to_time.utc.httpdate]

        else
          Array(v)
        end
    attrs.merge!(k => v)
  end
  attrs
end

Instance Method Details

#alter_instances!(*iss) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/cluster/infrastructures/amazon.rb', line 230

def alter_instances!(*iss)
  list = iss.empty? ? instances : iss

  for ins in list.flatten
    yield ins if block_given?
    attrs = ins.attributes
    remove = attrs.keys.select {|k| attrs[k].empty? and attrs.delete(k) }
    unless remove.empty?
      sdb.delete_attributes domain, ins.aws_id, remove
    end
    sdb.put_attributes domain, ins.aws_id, ins.attributes, :replace
  end
end

#authorize(ips) ⇒ Object



568
569
570
571
572
# File 'lib/cluster/infrastructures/amazon.rb', line 568

def authorize(ips)
  ips.map {|ip|
    ecc.authorize_security_group_IP_ingress('access', 22, 22, 'tcp', ip) and ip
  }
end

#balanceObject



370
371
372
373
374
# File 'lib/cluster/infrastructures/amazon.rb', line 370

def balance
  eid = current_instance.ec2_id

  elb.register_instances_with_load_balancer @options.elb_name, eid
end

#bucketObject



516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
# File 'lib/cluster/infrastructures/amazon.rb', line 516

def bucket
  unless @options.cluster_bucket
    puts "#{Cluster::NAME} has not been configured with a bucket for client materials."
    exit 2
  end

  @bucket ||= sss.bucket(@options.cluster_bucket, true)

  unless @bucket
    puts "#{Cluster::NAME} bucket named #{@options.cluster_bucket} cannot be created or accessed."
    exit 2
  else
    @bucket
  end
end

#clear_instancesObject



107
108
109
# File 'lib/cluster/infrastructures/amazon.rb', line 107

def clear_instances
  @@instances = nil
end

#configureObject



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/cluster/infrastructures/amazon.rb', line 5

def configure
  super
  @@in_cluster = in_cluster?
  @@instances = nil

  if credentials? and credentials.include? 'amazon'
    creds = credentials['amazon']
    @options.key = creds['key']
    @options.secret = creds['secret']
    @options.owner = creds['owner']
    @options.cluster_bucket = creds['cluster_bucket']
    @options.bucket_key = creds['bucket_key']
    @options.cluster_domain = creds['cluster_domain']
    @options.zones = creds['zones']
    @options.volumes = creds['volumes']
    @options.elb_name = creds['web']['elb_name']
  end

  @options.key ||= ENV['AWS_ACCESS_KEY'] 
  @options.secret ||= ENV['AWS_SECRET_KEY'] 
  @options.owner ||= ENV['AMAZON_OWNER_ID'] 
  @options.cluster_bucket ||= ENV['CLUSTER_BUCKET']
  @options.bucket_key ||= 'cluster_credentials.yml'
  @options.zones ||= if !ENV['AMAZON_ZONE'] or ENV['AMAZON_ZONE'].empty?
                       nil
                     else
                       Array(ENV['AMAZON_ZONE'])
                     end

  @options.cluster_domain ||= ENV['CLUSTER_DOMAIN'] || self.class.cluster_name
  @options.elb_name ||= ENV['INGAMER_LOAD_BALANCER']
  @options.volumes ||= {}

  @options.role = (ENV['CLUSTER_ROLE'] or ENV['RAILS_ENV'] or 'production')

  @options.cluster_image_key = 'cluster_images.yml'
  @options.spot_instances = false
  @options.price = false

  OptionParser.new {|o|
    o.banner = "Amazon Infrastructure Options"

    o.on('-k', '--key VAL', "Amazon Access Key ID") do |v|
      @options.key = v
    end

    o.on('-s', '--secret VAL', 'Amazon Access Secret') do |v|
      @options.secret = v
    end

    o.on('-o', '--owenr VAL', 'Amazon User Code') do |v|
      @options.owner = v
    end

    o.on('-d', '--domain VAL', 'Amazon Domain to use') do |v|
      @options.cluster_domain = v
    end

    o.on('-b', '--bucket VAL', 'Cluster Bucket') do |v|
      @options.cluster_bucket = v
    end

    o.on('-f', '--bucket-credentials-file VAL', 'Cluster credentials file location on the bucket.') do |v|
      @options.bucket_key = v
    end

    o.on('--source-bucket VAL', 'Bucket that has the host data.') do |v|
    end

    o.on('-r', '--role VAL', 'Role in which to operate.') do |v|
      @options.role = v
    end

    o.on('-z', '--zone VAL', "Availability Zone") do |v|
      @options.zones = Array(v)
    end

    o.on('--spot', "Use Spot Instances") do |v|
      @options.spot_instances = true
    end

    o.on('--price=VAL', 'Maximum price for the spot instance') do |v|
      @options.price = v
    end

  }.parse(@arguments)

  unless @options.key and @options.secret
    $stderr.puts "Amazon Infrastructure cannot communicate without secret and key."
    exit 2
  end

  unless @options.cluster_domain
    $stderr.puts "Amazon Infrastructure needs to know what domain to connect to."
    exit 2
  end
end

#connect_to_active_sdb(o = nil) ⇒ Object



384
385
386
387
388
# File 'lib/cluster/infrastructures/amazon.rb', line 384

def connect_to_active_sdb(o = nil)
  params = connection_params o
  params.delete(:region) if params[:region] =~ /^us-east/
  RightAws::ActiveSdb.establish_connection key, secret, params
end

#connection_params(o = nil) ⇒ Object



445
446
447
448
449
450
451
452
453
# File 'lib/cluster/infrastructures/amazon.rb', line 445

def connection_params(o = nil)
  params = o ? o.clone : {}
  params = {:multi_thread => true} if params.empty?

  params[:logger] ||= logger
  params[:region] ||= @options.zones.first

  params
end

#cost(sizes) ⇒ Object



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/cluster/infrastructures/amazon.rb', line 111

def cost(sizes)
  unless @options.spot_instances
    puts "Cost only works for spot instances currently (ie. supply infrastructure argument of --spot"
    exit 3
  end
  args = {:start_time => (Time.now - (7 * 24 * 60 * 60)),
        :end_time => Time.now,
        :product_description => "Linux/UNIX"}
  sizes.map {|s|
    t = [size_to_instance_type(s)]
    prices = ecc.describe_spot_price_history(args.merge(:instance_types => t))
    p = prices.reduce(0) {|c,p| c + p[:spot_price] } / prices.length
    [s, "%0.3f" % p]
  }
end

#create_data_store(name) ⇒ Object



131
132
133
# File 'lib/cluster/infrastructures/amazon.rb', line 131

def create_data_store(name)
  sdb.create_domain(name)
end

#create_file_store(name) ⇒ Object



127
128
129
# File 'lib/cluster/infrastructures/amazon.rb', line 127

def create_file_store(name)
  sss.bucket(name, true)
end

#credentials_url(seconds = 1200) ⇒ Object



473
474
475
476
# File 'lib/cluster/infrastructures/amazon.rb', line 473

def credentials_url(seconds = 1200)
  keygen = RightAws::S3Generator::Key.new(bucket, @options.bucket_key)
  keygen.get(seconds)
end

#current_instanceObject



316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/cluster/infrastructures/amazon.rb', line 316

def current_instance
  unless in_cluster?
    puts "Are we in the cluster?"
    exit 3
  end

  require 'open-uri'
  ec2_id = open('http://169.254.169.254/latest/meta-data/instance-id') {|f|  f.read }
  ins = self.instances.detect {|i| i.ec2_id.eql? ec2_id }

  unless ins
    puts "#{Cluster::NAME} cannot determine the current instance." 
    exit 2
  end

  if @options.current_instance_id
    aws = self.instances.detect {|i| i.aws_id.eql? @options.current_instance_id }
    if aws and aws.aws_id != ins.aws_id
      ins.services = (ins.services + aws.services).uniq
      ins.disabled_services = (ins.disabled_services + aws.disabled_services).uniq
      ins.spot_price = aws.spot_price unless ins.spot_price
      ins.friendly_name = aws.friendly_name unless ins.friendly_name
      sdb.delete_attributes domain, aws.aws_id
      sdb.put_attributes domain, ins.aws_id, ins.attributes, :replace
    end
  end

  ins
end

#domainObject



366
367
368
# File 'lib/cluster/infrastructures/amazon.rb', line 366

def domain
  @options.cluster_domain
end

#domains(reload = false) ⇒ Object



418
419
420
421
422
423
424
# File 'lib/cluster/infrastructures/amazon.rb', line 418

def domains(reload = false)
  @sdb_domains = if !@sdb_domains or reload
                   sdb.list_domains[:domains]
                 else
                   @sdb_domains
                 end
end

#ecc(o = nil) ⇒ Object



412
413
414
415
416
# File 'lib/cluster/infrastructures/amazon.rb', line 412

def ecc(o = nil)
  params = connection_params o
  @ec2 ||= {}
  @ec2[params[:region]] ||= RightAws::Ec2.new key, secret, params
end

#elb(o = nil) ⇒ Object



406
407
408
409
410
# File 'lib/cluster/infrastructures/amazon.rb', line 406

def elb(o = nil)
  params = connection_params o
  @elb ||= {}
  @elb[params[:region]] ||= RightAws::ElbInterface.new(key, secret, params)
end

#fetch_monitorObject



309
310
311
312
313
314
# File 'lib/cluster/infrastructures/amazon.rb', line 309

def fetch_monitor
  res = sdb.select "select * from #{domain} where entry = 'monitor'"
  return nil if res[:items].empty?
  monitor = self.class.from_sdb_results(res).first
  bucket.get(monitor['key'])
end

#get_image(bits) ⇒ Object



537
538
539
540
541
542
543
544
545
# File 'lib/cluster/infrastructures/amazon.rb', line 537

def get_image(bits)
  @image_file ||= get_image_file
  case bits
  when 32
    @image_file['thirtytwo']
  when 64
    @image_file['sixtyfour']
  end
end

#get_image_fileObject



532
533
534
535
# File 'lib/cluster/infrastructures/amazon.rb', line 532

def get_image_file
  require 'open-uri'
  open(Cluster::IMAGES) {|f| YAML::load(f) }
end

#in_cluster?Boolean

Returns:

  • (Boolean)


244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/cluster/infrastructures/amazon.rb', line 244

def in_cluster?
  return @cluster_check if @cluster_check

  check = false
  begin
    Timeout::timeout(1) do
      begin
        s = TCPSocket.new('169.254.169.254', 80)
        s.close
        check = true
      rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH
        # NOP
      end
    end
  rescue Timeout::Error
  end

  @@in_cluster = check
end

#instance_listObject

def add_instance(*services)

  update_machines
  ins = current_instance
  ins.services = services
  sdb.put_attributes domain, ins.aws_id, ins.attributes, :replace
end
alias :add_machine :add_instance


163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/cluster/infrastructures/amazon.rb', line 163

def instance_list
  terminated = []
  iss = @options.zones.map {|z|
    e = ecc region: z
    e.describe_instances.map {|ins|
      if %w(terminated shutting-down).include? ins[:aws_state]
        terminated << ins[:aws_instance_id]
        nil
      else
        AmazonInstance.new(ins)
      end
    }.compact
  }.flatten

  [iss, terminated]
end

#instancesObject



103
104
105
# File 'lib/cluster/infrastructures/amazon.rb', line 103

def instances
  @@instances ||= load_instances
end

#keyObject

The methods below here are typically only used internally, but could also be called by anything that would like to access the Amazon tools directly.



354
355
356
# File 'lib/cluster/infrastructures/amazon.rb', line 354

def key
  @options.key
end

#load_instancesObject



135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/cluster/infrastructures/amazon.rb', line 135

def load_instances
  begin
    res = sdb.select "select * from #{domain} where entry='machine'"
  rescue RightAws::AwsError
    unless domains.include? domain
      sdb.create_domain domain
      retry
    end
  end
  sdbs = self.class.from_sdb_results res

  sdbs.map {|sd|
    ins = AmazonInstance.new
    ins.set_sdb_attributes sd
    ins
  }
end

#new_instance(size, services) ⇒ Object



272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
# File 'lib/cluster/infrastructures/amazon.rb', line 272

def new_instance(size, services)
  args = { :services => services, :size => size }
  if @options.spot_instances
    unless @options.price
      puts "Amazon Spot instances need a '--price=??' argument"
      exit 3
    end
    args.merge! :spot_price => @options.price
  end
=begin
     type = size_to_type size
  groups = services_to_groups services
  image = type_to_image type
  puts "type #{type} Groups #{groups.inspect} K #{key} I #{image} UD #{user}"
=end
  ins = AmazonInstance.create args
  puts ins.inspect
  ins
end

#optionsObject



547
548
549
# File 'lib/cluster/infrastructures/amazon.rb', line 547

def options
  @options
end

#ownerObject



362
363
364
# File 'lib/cluster/infrastructures/amazon.rb', line 362

def owner
  @options.owner
end

#period(args) ⇒ Object



478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
# File 'lib/cluster/infrastructures/amazon.rb', line 478

def period(args)
  shots = {}
  ecc.describe_snapshots.each do |shot|
    next unless shot[:aws_owner].to_s == @options.owner.to_s
    next unless shot[:aws_status] == 'completed'
    shot[:started_at] = Time.parse shot[:aws_started_at]
    aid = shot[:aws_volume_id]
    if shots.include? aid
      shots[aid].push shot
    else
      shots.merge! aid => [shot]
    end
  end

  stores = @options.volumes['stores'] || []
  defaults = if @options.volumes.include?('defaults')
               @options.volumes['defaults']
             else
               {}
             end

  shots.keys.each do |k|
    opts = if v = stores.detect {|v| v['aws_id'] == k} 
             defaults.merge v
           else
             defaults
           end
    snap_count = opts['snaps'] || 2
    snap_count = 2 unless snap_count > 1

    ordered = shots[k].sort {|a,b| a[:started_at] <=> b[:started_at] }.reverse
    ordered.slice(snap_count, ordered.length).map {|shot|
      ecc.delete_snapshot shot[:aws_id]
      shot[:aws_id]
    }
  end
end

#query(qry) ⇒ Object



574
575
576
577
578
579
580
581
# File 'lib/cluster/infrastructures/amazon.rb', line 574

def query(qry)
  res = sdb.select qry
  if res
    self.class.from_sdb_results res
  else
    nil
  end
end

#release_classObject



346
347
348
# File 'lib/cluster/infrastructures/amazon.rb', line 346

def release_class
  AmazonRelease
end

#retrieve(key) ⇒ Object



459
460
461
# File 'lib/cluster/infrastructures/amazon.rb', line 459

def retrieve(key)
  bucket.get key
end

#revoke(ips) ⇒ Object



562
563
564
565
566
# File 'lib/cluster/infrastructures/amazon.rb', line 562

def revoke(ips)
  ips.map {|ip|
    ecc.revoke_security_group_IP_ingress('access', 22, 22, 'tcp', ip) and ip
  }
end

#save_credentials(creds, key = nil) ⇒ Object



463
464
465
466
# File 'lib/cluster/infrastructures/amazon.rb', line 463

def save_credentials(creds, key = nil)
  key ||= @options.bucket_key
  bucket.put(key, creds)
end

#save_images(input) ⇒ Object



468
469
470
471
# File 'lib/cluster/infrastructures/amazon.rb', line 468

def save_images(input)
  key ||= @options.cluster_image_key
  bucket.put key, input, {}, 'public-read'
end

#save_monitor(io, key) ⇒ Object



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/cluster/infrastructures/amazon.rb', line 292

def save_monitor(io, key)
  if bucket.put key, io
    old = sdb.select "select * from #{domain} where entry = 'monitor'"
    monitor = unless old[:items].empty?
                self.class.from_sdb_results(old).first
              else
                { 'aws_id' => UUIDTools::UUID.timestamp_create.to_s,
                  'entry' => 'monitor' }
              end
    monitor.merge! 'updated_at' => Time.now.xmlschema(3),
      'key' => key,
      'bucket' => @cluster_bucket
    sdb.put_attributes domain, monitor['aws_id'], self.class.to_sdb_attributes(monitor), :replace
    monitor
  end
end

#sdb(o = nil) ⇒ Object



376
377
378
379
380
381
382
# File 'lib/cluster/infrastructures/amazon.rb', line 376

def sdb(o = nil)
  params = connection_params o
  region = params[:region]
  params.delete(:region) if region =~ /^us-east/
  @sdb ||= {}
  @sdb[region] ||= RightAws::SdbInterface.new(key, secret, params)
end

#secretObject



358
359
360
# File 'lib/cluster/infrastructures/amazon.rb', line 358

def secret
  @options.secret
end

#security(groups) ⇒ Object



551
552
553
554
555
556
557
558
559
560
# File 'lib/cluster/infrastructures/amazon.rb', line 551

def security(groups)
  ecc.describe_security_groups(groups).inject({}) {|m,g|
    p = g[:aws_perms].map {|p|
      next unless p[:cidr_ips]
      [p[:cidr_ips], p[:from_port]..p[:to_port]]
    }.compact

    m.merge g[:aws_group_name] => p
  }
end

#size_to_instance_type(size) ⇒ Object



264
265
266
267
268
269
270
# File 'lib/cluster/infrastructures/amazon.rb', line 264

def size_to_instance_type(size)
  unless self.class.sizes.include? size.downcase
    puts "#{Cluster::NAME} does not have a machine size of #{size}\n\tAvailable Sizes: (#{self.class.sizes.join(', ')})"
    exit 2
  end
  AmazonInstance.size_to_type(size)
end

#sqs(o = nil) ⇒ Object



398
399
400
401
402
403
404
# File 'lib/cluster/infrastructures/amazon.rb', line 398

def sqs(o = nil)
  params = connection_params o
  region = params[:region]
  params.delete(:region) if region =~ /^us-east/
  @sqs ||= {}
  @sqs[region] ||= RightAws::SqsGen2.new(key, secret, params)
end

#sss(o = nil) ⇒ Object



390
391
392
393
394
395
396
# File 'lib/cluster/infrastructures/amazon.rb', line 390

def sss(o = nil)
  params = connection_params o
  region = params[:region]
  params.delete(:region) if region =~ /^us-east/
  @sss ||= {}
  @sss[region] ||= RightAws::S3.new(key, secret, params)
end

#store(key, io) ⇒ Object



455
456
457
# File 'lib/cluster/infrastructures/amazon.rb', line 455

def store(key, io)
  bucket.put(key, io)
end

#update_instancesObject Also known as: update_machines



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/cluster/infrastructures/amazon.rb', line 180

def update_instances
  iss, terminated = instance_list

  begin
    res = sdb.select "select * from #{domain} where entry='machine'"
  rescue RightAws::AwsError
    unless domains.include? domain
      sdb.create_domain domain 
      retry
    end
  end
  sdbs = self.class.from_sdb_results res

  sdbs.each do |sd|
    aid = sd[:aws_id]
    iid = sd['ec2_id']
    if !iid
      started = sd['start_time_sorted'] && Time.parse(sd['start_time_sorted'])
      diff = started && (Time.now - started)
      if diff and diff < (12 * 3600)
        ins = AmazonInstance.new
        ins.set_sdb_attributes sd
        iss.push ins
      else
        $stderr.puts "Cannot find machine #{aid} -- old entry being removed."
        sdb.delete_attributes domain, aid
      end
    else
      if ins = iss.detect {|i| i.id.eql? iid }
        ins.set_sdb_attributes sd
      elsif terminated.include? iid
        $stderr.puts "Removing terminated entry #{iid}"
        sdb.delete_attributes domain, aid
      else
        $stderr.puts "Orphaned cluster record of #{aid}.  (Just started?) [#{sd.inspect}]"
      end
    end
  end

  iss.each do |ins|
    if ins.no_sdb?
      puts "Cannot find cluster registration for #{ins.ec2_id} -- creating."
    end
    sdb.put_attributes domain, ins.aws_id, ins.attributes, :replace
  end

  iss
end