Class: Wakame::Agent

Inherits:
Object
  • Object
show all
Includes:
AMQPClient, QueueDeclare
Defined in:
lib/wakame/agent.rb

Instance Attribute Summary collapse

Attributes included from AMQPClient

#amqp_client, #mq, #queue_subscribers

Class Method Summary collapse

Instance Method Summary collapse

Methods included from QueueDeclare

included

Methods included from AMQPClient

#add_subscriber, #amq, #amqp_server_uri, #close, #connect, #connected?, #define_queue, included, #publish_to

Constructor Details

#initialize(opts = {}) ⇒ Agent

Returns a new instance of Agent.



30
31
32
33
34
# File 'lib/wakame/agent.rb', line 30

def initialize(opts={})
  determine_agent_id
  @actor_registry = ActorRegistry.new
  @monitor_registry = MonitorRegistry.new
end

Instance Attribute Details

#actor_registryObject (readonly)

Returns the value of attribute actor_registry.



24
25
26
# File 'lib/wakame/agent.rb', line 24

def actor_registry
  @actor_registry
end

#monitor_registryObject (readonly)

Returns the value of attribute monitor_registry.



24
25
26
# File 'lib/wakame/agent.rb', line 24

def monitor_registry
  @monitor_registry
end

Class Method Details

.ec2_fetch_local_attrsObject



129
130
131
132
133
134
135
136
137
# File 'lib/wakame/agent.rb', line 129

def self.ec2_fetch_local_attrs
  attrs = {}
  %w[instance-id instance-type local-ipv4 local-hostname public-hostname public-ipv4 ami-id].each { |key|
    rkey = key.tr('-', '_')
    attrs[rkey.to_sym]=(key)
  }
  attrs[:availability_zone] = ('placement/availability-zone')
  attrs
end

.ec2_query_metadata_uri(key) ⇒ Object



122
123
124
125
126
127
# File 'lib/wakame/agent.rb', line 122

def self.(key)
  require 'open-uri'
  open("http://169.254.169.254/2008-02-01/meta-data/#{key}") { |f|
    return f.readline
  }
end

Instance Method Details

#agent_idObject



26
27
28
# File 'lib/wakame/agent.rb', line 26

def agent_id
  @agent_id
end

#cleanupObject

def send_event_response(event)

  Wakame.log.debug("Sending event to master : #{event.class}")
  publish_to('agent_event', Marshal.dump(Packets::EventResponse.new(self, event)))
end


56
57
58
59
# File 'lib/wakame/agent.rb', line 56

def cleanup
  publish_to('registry', Packets::UnRegister.new(self).marshal)
  #@cmd_t.kill
end

#determine_agent_idObject



61
62
63
64
65
66
67
# File 'lib/wakame/agent.rb', line 61

def determine_agent_id
  if Wakame.config.environment == :EC2
    @agent_id = self.class.('instance-id')
  else
    @agent_id = '__STAND_ALONE__'
  end
end

#initObject

post_setup



37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/wakame/agent.rb', line 37

def init
  setup_monitors
  setup_actors
  setup_dispatcher

  if Wakame.config.environment == :EC2
    attrs = self.class.ec2_fetch_local_attrs
  else
    attrs = {}
  end
  publish_to('registry', Packets::Register.new(self, Wakame.config.root_path.to_s, attrs).marshal)
  Wakame.log.info("Started agent process : WAKAME_ROOT=#{Wakame.config.root_path} WAKAME_ENV=#{Wakame.config.environment}, attrs=#{attrs.inspect}")
end

#load_actorsObject



100
101
102
103
104
105
# File 'lib/wakame/agent.rb', line 100

def load_actors
  require 'wakame/actor/service_monitor'
  require 'wakame/actor/daemon'
  require 'wakame/actor/system'
  require 'wakame/actor/mysql'
end

#load_monitorsObject



82
83
84
85
# File 'lib/wakame/agent.rb', line 82

def load_monitors
  require 'wakame/monitor/agent'
  require 'wakame/monitor/service'
end

#setup_actorsObject



87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/wakame/agent.rb', line 87

def setup_actors
  load_actors

  @actor_registry.register(Actor::ServiceMonitor.new, '/service_monitor')
  @actor_registry.register(Actor::Daemon.new, '/daemon')
  @actor_registry.register(Actor::System.new, '/system')
  @actor_registry.register(Actor::MySQL.new, '/mysql')
  @actor_registry.actors.each { |path, actor|
#        actor.setup(path)
    actor.agent = self
  }
end

#setup_dispatcherObject



108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/wakame/agent.rb', line 108

def setup_dispatcher
  @dispatcher = Dispatcher.new(self)
  
  add_subscriber("agent_actor.#{agent_id}") { |data|
    begin 
      request = eval(data)
      @dispatcher.handle_request(request)
    rescue => e
      Wakame.log.error(e)
      agent.publish_to('agent_event', Packets::ActorResponse.new(self, request[:token], Actor::STATUS_FAILED).marshal)
    end
  }
end

#setup_monitorsObject



70
71
72
73
74
75
76
77
78
79
80
# File 'lib/wakame/agent.rb', line 70

def setup_monitors
  load_monitors

  @monitor_registry.register(Monitor::Agent.new, '/agent')
  @monitor_registry.register(Monitor::Service.new, '/service')
  
  @monitor_registry.monitors.each { |path, mon|
    mon.agent = self
    mon.setup(path)
  }
end