Class: Wakame::Master

Inherits:
Object
  • Object
show all
Includes:
AMQPClient, QueueDeclare
Defined in:
lib/wakame/master.rb

Instance Attribute Summary collapse

Attributes included from AMQPClient

#amqp_client, #mq, #queue_subscribers

Class Method Summary collapse

Instance Method Summary collapse

Methods included from QueueDeclare

included

Methods included from AMQPClient

#add_subscriber, #amq, #amqp_server_uri, #close, #connect, #connected?, #define_queue, included, #publish_to

Constructor Details

#initialize(opts = {}) ⇒ Master

Returns a new instance of Master.



21
22
23
# File 'lib/wakame/master.rb', line 21

def initialize(opts={})
  pre_setup
end

Instance Attribute Details

#action_managerObject (readonly)

Returns the value of attribute action_manager.



18
19
20
# File 'lib/wakame/master.rb', line 18

def action_manager
  @action_manager
end

#agent_monitorObject (readonly)

Returns the value of attribute agent_monitor.



18
19
20
# File 'lib/wakame/master.rb', line 18

def agent_monitor
  @agent_monitor
end

#cluster_managerObject (readonly)

Returns the value of attribute cluster_manager.



18
19
20
# File 'lib/wakame/master.rb', line 18

def cluster_manager
  @cluster_manager
end

#command_queueObject (readonly)

Returns the value of attribute command_queue.



18
19
20
# File 'lib/wakame/master.rb', line 18

def command_queue
  @command_queue
end

#managersObject (readonly)

Returns the value of attribute managers.



19
20
21
# File 'lib/wakame/master.rb', line 19

def managers
  @managers
end

#started_atObject (readonly)

Returns the value of attribute started_at.



18
19
20
# File 'lib/wakame/master.rb', line 18

def started_at
  @started_at
end

Class Method Details

.ec2_fetch_local_attrsObject



70
71
72
73
74
75
76
77
78
# File 'lib/wakame/master.rb', line 70

def self.ec2_fetch_local_attrs
  attrs = {}
  %w[instance-id instance-type local-ipv4 local-hostname public-hostname public-ipv4 ami-id].each{|key|
    rkey = key.tr('-', '_')
    attrs[rkey.to_sym]=(key)
  }
  attrs[:availability_zone]=('placement/availability-zone')
  attrs
end

.ec2_query_metadata_uri(key) ⇒ Object



63
64
65
66
67
68
# File 'lib/wakame/master.rb', line 63

def self.(key)
  require 'open-uri'
  open("http://169.254.169.254/2008-02-01/meta-data/#{key}"){|f|
    return f.readline
  }
end

Instance Method Details

#actor_request(agent_id, path, *args) ⇒ Object



26
27
28
29
# File 'lib/wakame/master.rb', line 26

def actor_request(agent_id, path, *args)
  request = Wakame::Packets::ActorRequest.new(agent_id, Util.gen_id, path, *args)
  ActorRequest.new(self, request)
end

#cleanupObject



32
33
34
35
# File 'lib/wakame/master.rb', line 32

def cleanup
  @managers.each { |m| m.terminate }
  @command_queue.shutdown
end

#initObject

post_setup



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/wakame/master.rb', line 45

def init
  raise 'has to be put in EM.run context' unless EM.reactor_running?
  @command_queue = register_manager(MasterManagers::CommandQueue.new)

  # WorkerThread has to run earlier than other managers.
  @agent_monitor = register_manager(MasterManagers::AgentMonitor.new)
  @cluster_manager = register_manager(MasterManagers::ClusterManager.new)
  @action_manager = register_manager(MasterManagers::ActionManager.new)

  @managers.each {|m|
    Wakame.log.debug("Initializing Manager Module: #{m.class}")
    m.init
  }

  Wakame.log.info("Started master process : AMQP Server=#{amqp_server_uri.to_s} WAKAME_ROOT=#{Wakame.config.root_path} WAKAME_ENV=#{Wakame.config.environment} WAKAME_CLUSTER_ENV=#{Wakame.config.cluster_env}")
end

#register_manager(manager) ⇒ Object

Raises:

  • (ArgumentError)


37
38
39
40
41
42
# File 'lib/wakame/master.rb', line 37

def register_manager(manager)
  raise ArgumentError unless manager.kind_of? MasterManager
  manager.master = self
  @managers << manager
  manager
end