Class: Wakame::Actions::PropagateInstances

Inherits:
Wakame::Action show all
Defined in:
lib/wakame/actions/propagate_instances.rb

Constant Summary

Constants included from AttributeHelper

AttributeHelper::CONVERT_CLASSES, AttributeHelper::PRIMITIVE_CLASSES

Instance Attribute Summary

Attributes inherited from Wakame::Action

#trigger

Instance Method Summary collapse

Methods inherited from Wakame::Action

#acquire_lock, #actor_request, #agent_monitor, #all_subactions_complete?, #bind_trigger, #flush_subactions, #master, #notes, #notify, #notify_queue, #on_canceled, #on_failed, #service_cluster, #status=, #subactions, #sync_actor_request, #trigger_action, #walk_subactions

Methods included from ThreadImmutable

#bind_thread, included, #target_thread, #target_thread?, #thread_check

Methods included from AttributeHelper

#dump_attrs

Constructor Details

#initialize(svc_prop, propagate_num = 0) ⇒ PropagateInstances

Returns a new instance of PropagateInstances.

Raises:

  • (ArgumentError)


5
6
7
8
9
# File 'lib/wakame/actions/propagate_instances.rb', line 5

def initialize(svc_prop, propagate_num=0)
  raise ArgumentError unless svc_prop.is_a?(Wakame::Service::Resource)
  @svc_prop = svc_prop
  @propagate_num = propagate_num
end

Instance Method Details

#runObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/wakame/actions/propagate_instances.rb', line 11

def run
  svc_to_start = []

  EM.barrier {
    @propagate_num.times {
      service_cluster.propagate(@svc_prop)
    }

    # First, look for the service instances which are already created in the cluster. Then they will be scheduled to start the services later.
    online_svc = []
    service_cluster.each_instance(@svc_prop.class) { |svc_inst|
      if svc_inst.status == Service::STATUS_ONLINE || svc_inst.status == Service::STATUS_STARTING
        online_svc << svc_inst
      else
        svc_to_start << svc_inst
      end
    }

    # The list is empty means that this action is called to propagate a new service instance instead of just starting scheduled instances.
    svc_count = service_cluster.instance_count(@svc_prop)
    if svc_count > online_svc.size + svc_to_start.size
      Wakame.log.debug("#{self.class}: @svc_prop.instance_count - online_svc.size=#{svc_count - online_svc.size}")
      (svc_count - (online_svc.size + svc_to_start.size)).times {
        svc_to_start << service_cluster.propagate(@svc_prop.class)
      }
    end
  }

  acquire_lock { |ary|
    svc_to_start.each { |svc|
      ary << svc.resource.class
    }
  }

  svc_to_start.each { |svc|
    target_agent = nil
    if svc.property.require_agent
      # Try to arrange agent from existing agent pool.
      if svc.agent.nil?
        EM.barrier {
          agent_monitor.each_online { |ag|
            if !ag.has_service_type?(@svc_prop.class) && @svc_prop.vm_spec.current.satisfy?(ag)
              target_agent = ag
              break
            end
          }
        }
      end

      Wakame.log.debug("#{self.class}: arranged agent for #{svc.resource.class}: #{target_agent ? target_agent.agent_id : nil}")
    end

    trigger_action(StartService.new(svc, target_agent))
  }
  flush_subactions
end