Class: Wakame::Service::ServiceCluster

Inherits:
Wakame::StatusDB::Model show all
Includes:
ThreadImmutable
Defined in:
lib/wakame/service.rb

Constant Summary collapse

STATUS_OFFLINE =
0
STATUS_ONLINE =
1
STATUS_PARTIAL_ONLINE =
2

Constants included from AttributeHelper

AttributeHelper::CLASS_TYPE_KEY, AttributeHelper::CONVERT_CLASSES, AttributeHelper::PRIMITIVE_CLASSES

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from ThreadImmutable

#bind_thread, included, #target_thread, #target_thread?, #thread_check

Methods inherited from Wakame::StatusDB::Model

#delete, #dirty?, inherited, #new_record?, #on_after_delete, #on_after_load, #on_before_delete, #on_before_load, #reload, #save

Methods included from AttributeHelper

#dump_attrs, #retrieve_attr_attribute

Instance Attribute Details

#trigger_setObject (readonly)

Returns the value of attribute trigger_set.



347
348
349
# File 'lib/wakame/service.rb', line 347

def trigger_set
  @trigger_set
end

Class Method Details

.id(name) ⇒ Object



349
350
351
352
# File 'lib/wakame/service.rb', line 349

def self.id(name)
  require 'digest/sha1'
  Digest::SHA1.hexdigest(name)
end

Instance Method Details

#add_cloud_host(&blk) ⇒ Object



559
560
561
562
563
564
565
566
567
568
569
570
# File 'lib/wakame/service.rb', line 559

def add_cloud_host(&blk)
  h = CloudHost.new
  h.cluster_id = self.id
  self.cloud_hosts[h.id]=1

  blk.call(h) if blk

  h.save
  self.save

  h
end

#add_resource(resource, &blk) ⇒ Object



409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
# File 'lib/wakame/service.rb', line 409

def add_resource(resource, &blk)
  if resource.is_a?(Class) && resource <= Resource
    resource = resource.new
  elsif resource.is_a? Resource
  else
    raise ArgumentError
  end
  raise "Duplicate resource registration: #{resource.class}" if self.resources.has_key? resource.id

  blk.call(resource) if blk

  resources[resource.id]=1
  self.dg.add_object(resource)

  resource.save
  self.save
  self.dg.save
  resource
end

#agentsObject



387
388
389
390
391
392
393
394
# File 'lib/wakame/service.rb', line 387

def agents
  res={}
  cloud_hosts.keys.collect { |cloud_host_id|
    h = CloudHost.find(cloud_host_id)
    res[cloud_host_id]=h.agent_id if h.mapped?
  }
  res
end

#define_triggers(&blk) ⇒ Object



359
360
361
362
# File 'lib/wakame/service.rb', line 359

def define_triggers(&blk)
  @trigger_set ||= TriggerSet.new(self.id)
  blk.call(@trigger_set)
end

#destroy(svc_id) ⇒ Object

Create service instance objects which will be equivalent with the number min_instance. The agents are not assigned at this point. def launch

self.resources.keys.each { |res_id|
  res = Resource.find(res_id)
  count = instance_count(res.class)
  if res.min_instances > count
    (res.min_instances - count).times {
      propagate(res.class)
    }
  end
}

end thread_immutable_methods :launch



472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
# File 'lib/wakame/service.rb', line 472

def destroy(svc_id)
  raise("Unknown service instance : #{svc_id}") unless self.services.has_key?(svc_id)
  svc = ServiceInstance.find(svc_id)
  svc.unbind_cluster
  self.services.delete(svc.id)
  old_host = svc.unbind_cloud_host

  if old_host
    Wakame.log.debug("#{svc.resource.class}(#{svc.id}) has been destroied from Host #{old_host.inspect}")
  else
    Wakame.log.debug("#{svc.resource.class}(#{svc.id}) has been destroied.")
  end

  svc.delete
  self.save
end

#dgObject



396
397
398
399
400
401
402
403
404
405
406
# File 'lib/wakame/service.rb', line 396

def dg
  unless self.dg_id.nil?
    graph = DependencyGraph.find(self.dg_id)
  else
    graph = DependencyGraph.new
    graph.save
    self.dg_id = graph.id
    self.save
  end
  graph
end

#each_instance(filter_resource = nil, &blk) ⇒ Object

Iterate the service instances in this cluster

The first argument is used for filtering only specified resource instances. Iterated instance objects are passed to the block when it is given. The return value is an array contanins registered service instance objects (filtered).



601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
# File 'lib/wakame/service.rb', line 601

def each_instance(filter_resource=nil, &blk)
  filter_resource = case filter_resource 
                    when Resource
                      filter_resource.class
                    when String
                      Util.build_const(filter_resource)
                    when Module, NilClass
                      filter_resource
                    else
                      raise ArgumentError, "The first argument has to be in form of NilClass, Resource, String or Module: #{filter_resource.class}"
                    end

  filter_ids = []

  unless filter_resource.nil?
    filter_ids = self.resources.keys.find_all { |resid|
      Resource.find(resid).kind_of?(filter_resource)
    }
    return [] if filter_ids.empty?
  end
  
  ary = self.services.keys.collect {|k| ServiceInstance.find(k) }
  if filter_resource.nil?
  else
    ary = ary.find_all{|v| filter_ids.member?(v.resource.id) }
  end

  ary.each {|v| blk.call(v) } if block_given?
  ary
end

#find_service(svc_id) ⇒ Object



582
583
584
585
# File 'lib/wakame/service.rb', line 582

def find_service(svc_id)
  raise "The service ID #{svc_id} is not registered to this cluster \"#{self.name}\"" unless self.services.has_key? svc_id
  ServiceInstance.find(svc_id) || raise("The service ID #{svc_id} is registered. but not in the database.")
end

#has_instance?(svc_id) ⇒ Boolean

Returns:

  • (Boolean)


449
450
451
# File 'lib/wakame/service.rb', line 449

def has_instance?(svc_id)
  self.services.has_key? svc_id
end

#idObject



354
355
356
357
# File 'lib/wakame/service.rb', line 354

def id
  raise "Cluster name is not set yes" if self.name.nil?
  self.class.id(self.name)
end

#instance_count(resource = nil) ⇒ Object



587
588
589
590
591
592
593
594
595
# File 'lib/wakame/service.rb', line 587

def instance_count(resource=nil)
  return self.services.size if resource.nil?

  c = 0
  each_instance(resource) { |svc|
    c += 1
  }
  c
end

#mapped_agent?(agent_id) ⇒ Boolean

Returns:

  • (Boolean)


380
381
382
383
384
385
# File 'lib/wakame/service.rb', line 380

def mapped_agent?(agent_id)
  cloud_hosts.keys.any? { |cloud_host_id|
    h = CloudHost.find(cloud_host_id)
    h.mapped? && h.agent_id == agent_id
  }
end

#propagate_resource(resource, cloud_host_id = nil, force = false) ⇒ Object Also known as: propagate

def propagate(resource, force=false)

Raises:

  • (ArgumentError)


492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
# File 'lib/wakame/service.rb', line 492

def propagate_resource(resource, cloud_host_id=nil, force=false)
  res_id = Resource.id(resource)
  res_obj = (self.resources.has_key?(res_id) && Resource.find(res_id)) || raise("Unregistered resource: #{resource.to_s}")
  raise ArgumentError if res_obj.require_agent && cloud_host_id.nil?

  if force == false
    instnum = instance_count(res_obj)
    if instnum >= res_obj.max_instances
      raise ServicePropagationError, "#{res_obj.class} has been reached to max_instance limit: max=#{res_obj.max_instances}" 
    end
  end
  
  svc = ServiceInstance.new
  svc.bind_cluster(self)
  svc.bind_resource(res_obj)

  # cloud_host_id must be set when the resource is placed on agent.
  if res_obj.require_agent
    host = CloudHost.find(cloud_host_id) || raise("#{self.class}: Unknown CloudHost ID: #{cloud_host_id}")
    svc.bind_cloud_host(host)
  end

  self.services[svc.id]=1

  svc.save
  self.save

  svc
end

#propagate_service(svc_id, cloud_host_id = nil, force = false) ⇒ Object



524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
# File 'lib/wakame/service.rb', line 524

def propagate_service(svc_id, cloud_host_id=nil, force=false)
  src_svc = (self.services.has_key?(svc_id) && ServiceInstance.find(svc_id)) || raise("Unregistered service: #{svc_id.to_s}")
  res_obj = src_svc.resource

  if force == false
    instnum = instance_count(res_obj)
    if instnum >= res_obj.max_instances
      raise ServicePropagationError, "#{res_obj.class} has been reached to max_instance limit: max=#{res_obj.max_instances}" 
    end
  end
  
  svc = ServiceInstance.new
  svc.bind_cluster(self)
  svc.bind_resource(res_obj)

  if res_obj.require_agent
    if cloud_host_id
      host = CloudHost.find(cloud_host_id) || raise("#{self.class}: Unknown Host ID: #{cloud_host_id}")
    else
      host = add_cloud_host { |h|
        h.vm_attr = src_svc.cloud_host.vm_attr.dup
      }
    end
    svc.bind_cloud_host(host)
  end

  self.services[svc.id]=1

  svc.save
  self.save

  svc
end

#propertiesObject



648
649
650
# File 'lib/wakame/service.rb', line 648

def properties
  self.resources
end

#remove_cloud_host(cloud_host_id) ⇒ Object



572
573
574
575
576
577
578
579
580
# File 'lib/wakame/service.rb', line 572

def remove_cloud_host(cloud_host_id)
  if self.cloud_hosts.has_key?(cloud_host_id)
    self.cloud_hosts.delete(cloud_host_id)
  
    self.save
  end

  CloudHost.delete(cloud_host_id) rescue nil
end

#resetObject



370
371
372
373
374
375
376
377
378
# File 'lib/wakame/service.rb', line 370

def reset
  services.clear
  resources.clear
  cloud_hosts.clear
  template_vm_attr.clear
  advertised_amqp_servers = nil
  @status = self.class.attr_attributes[:status][:default]
  @status_changed_at = Time.now
end

#set_dependency(res_name1, res_name2) ⇒ Object

Set dependency between two resources.



431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
# File 'lib/wakame/service.rb', line 431

def set_dependency(res_name1, res_name2)
  validate_arg = proc {|o|
    o = Util.build_const(o) if o.is_a? String
    raise ArgumentError unless o.is_a?(Class) && o <= Resource
    raise "This is not a member of this cluster \"#{self.class}\": #{o}" unless resources.member?(o.id)
    raise "Unknown resource object: #{o}" unless Resource.exists?(o.id)
    o
  }
  
  res_name1 = validate_arg.call(res_name1)
  res_name2 = validate_arg.call(res_name2)
  
  return if res_name1.id == res_name2.id

  self.dg.set_dependency(res_name1, res_name2)
end

#shutdownObject



453
454
# File 'lib/wakame/service.rb', line 453

def shutdown
end

#sizeObject



644
645
646
# File 'lib/wakame/service.rb', line 644

def size
  self.dg.size
end

#template_vm_specObject



364
365
366
367
368
# File 'lib/wakame/service.rb', line 364

def template_vm_spec
  spec = VmSpec.current
  spec.table = self.template_vm_attr
  spec
end

#update_cluster_statusObject

private



656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
# File 'lib/wakame/service.rb', line 656

def update_cluster_status
  onlines = []
  all_offline = false

  onlines = self.each_instance.select { |i|
    i.status == Service::STATUS_ONLINE
  }
  all_offline = self.each_instance.all? { |i|
    i.status == Service::STATUS_OFFLINE
  }
  #Wakame.log.debug "online instances: #{onlines.size}, assigned instances: #{self.instances.size}"

  prev_status = self.status
  if self.instances.size == 0 || all_offline
    self.update_status(Service::ServiceCluster::STATUS_OFFLINE)
  elsif onlines.size == self.instances.size
    self.update_status(Service::ServiceCluster::STATUS_ONLINE)
  elsif onlines.size > 0
    self.update_status(Service::ServiceCluster::STATUS_PARTIAL_ONLINE)
  end

end

#update_status(new_status) ⇒ Object



632
633
634
635
636
637
638
639
640
641
# File 'lib/wakame/service.rb', line 632

def update_status(new_status)
  if @status != new_status
    @status = new_status
    @status_changed_at = Time.now

    self.save

    ED.fire_event(Event::ClusterStatusChanged.new(id, new_status))
  end
end