Class: Wakame::AgentMonitor
- Inherits:
-
Object
- Object
- Wakame::AgentMonitor
- Includes:
- ThreadImmutable
- Defined in:
- lib/wakame/master.rb
Instance Attribute Summary collapse
-
#gc_period ⇒ Object
readonly
Returns the value of attribute gc_period.
-
#master ⇒ Object
readonly
Returns the value of attribute master.
-
#registered_agents ⇒ Object
readonly
Returns the value of attribute registered_agents.
-
#unregistered_agents ⇒ Object
readonly
Returns the value of attribute unregistered_agents.
Instance Method Summary collapse
- #agent(agent_id) ⇒ Object
- #dump_status ⇒ Object
- #each_online(&blk) ⇒ Object
-
#initialize(master) ⇒ AgentMonitor
constructor
A new instance of AgentMonitor.
-
#master_local ⇒ Object
Retruns the master local agent object.
- #register_agent(data) ⇒ Object
- #unregister_agent(agent_id) ⇒ Object
Methods included from ThreadImmutable
#bind_thread, included, #target_thread, #target_thread?, #thread_check
Constructor Details
#initialize(master) ⇒ AgentMonitor
Returns a new instance of AgentMonitor.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/wakame/master.rb', line 18 def initialize(master) bind_thread @master = master @registered_agents = {} @unregistered_agents = {} @agent_timeout = 31.to_f @agent_kill_timeout = @agent_timeout * 2 @gc_period = 20.to_f # GC event trigger for agent timer & status calc_agent_timeout = proc { #Wakame.log.debug("Started agent GC : agents.size=#{@registered_agents.size}") kill_list=[] registered_agents.each { |agent_id, agent| next if agent.status == Service::Agent::STATUS_OFFLINE diff_time = Time.now - agent.last_ping_at #Wakame.log.debug "AgentMonitor GC : #{agent_id}: #{diff_time}" if diff_time > @agent_timeout.to_f agent.status = Service::Agent::STATUS_TIMEOUT end if diff_time > @agent_kill_timeout.to_f kill_list << agent_id end } kill_list.each { |agent_id| agent = @agents.delete(agent_id) ED.fire_event(Event::AgentUnMonitored.new(agent)) unless agent.nil? } #Wakame.log.debug("Finished agent GC") } @agent_timeout_timer = EventMachine::PeriodicTimer.new(@gc_period, calc_agent_timeout) master.add_subscriber('registry') { |data| data = eval(data) agent_id = data[:agent_id] case data[:type] when 'Wakame::Packets::Register' register_agent(data) when 'Wakame::Packets::UnRegister' unregister_agent(agent_id) end } master.add_subscriber('ping') { |data| ping = eval(data) # Skip the old ping responses before starting master node. next if Time.parse(ping[:responded_at]) < master.started_at # Variable update function for the common members set_report_values = proc { |agent| agent.status = Service::Agent::STATUS_ONLINE agent.uptime = 0 agent.last_ping_at = Time.parse(ping[:responded_at]) agent.attr = ping[:attrs] agent.services.clear ping.services.each { |svc_id, i| agent.services[svc_id] = master.service_cluster.instances[svc_id] } } agent = agent(ping[:agent_id]) if agent.nil? agent = Service::Agent.new(ping[:agent_id]) set_report_values.call(agent) unregistered_agents[ping[:agent_id]]=agent else set_report_values.call(agent) end ED.fire_event(Event::AgentPong.new(agent)) } master.add_subscriber('agent_event') { |data| response = eval(data) #p response case response[:type] when 'Wakame::Packets::ServiceStatusChanged' svc_inst = Service::ServiceInstance.instance_collection[response[:svc_id]] if svc_inst response_time = Time.parse(response[:responded_at]) svc_inst.update_status(response[:new_status], response_time, response[:fail_message]) # tmp_event = Event::ServiceStatusChanged.new(response[:svc_id], svc_inst.property, response[:status], response[:previous_status]) # tmp_event.time = response_time # ED.fire_event(tmp_event) # if response[:previous_status] != Service::STATUS_ONLINE && response[:new_status] == Service::STATUS_ONLINE # tmp_event = Event::ServiceOnline.new(tmp_event.instance_id, svc_inst.property) # tmp_event.time = response_time # ED.fire_event(tmp_event) # elsif response[:previous_status] != Service::STATUS_OFFLINE && response[:new_status] == Service::STATUS_OFFLINE # tmp_event = Event::ServiceOffline.new(tmp_event.instance_id, svc_inst.property) # tmp_event.time = response_time # ED.fire_event(tmp_event) # elsif response[:previous_status] != Service::STATUS_FAIL && response[:new_status] == Service::STATUS_FAIL # tmp_event = Event::ServiceFailed.new(tmp_event.instance_id, svc_inst.property, response[:fail_message]) # tmp_event.time = response_time # ED.fire_event(tmp_event) # end end when 'Wakame::Packets::ActorResponse' case response[:status] when Actor::STATUS_RUNNING ED.fire_event(Event::ActorProgress.new(response[:agent_id], response[:token], 0)) else ED.fire_event(Event::ActorComplete.new(response[:agent_id], response[:token], response[:status])) end else Wakame.log.warn("#{self.class}: Unhandled agent response: #{response[:type]}") end } end |
Instance Attribute Details
#gc_period ⇒ Object (readonly)
Returns the value of attribute gc_period.
15 16 17 |
# File 'lib/wakame/master.rb', line 15 def gc_period @gc_period end |
#master ⇒ Object (readonly)
Returns the value of attribute master.
15 16 17 |
# File 'lib/wakame/master.rb', line 15 def master @master end |
#registered_agents ⇒ Object (readonly)
Returns the value of attribute registered_agents.
15 16 17 |
# File 'lib/wakame/master.rb', line 15 def registered_agents @registered_agents end |
#unregistered_agents ⇒ Object (readonly)
Returns the value of attribute unregistered_agents.
15 16 17 |
# File 'lib/wakame/master.rb', line 15 def unregistered_agents @unregistered_agents end |
Instance Method Details
#agent(agent_id) ⇒ Object
142 143 144 |
# File 'lib/wakame/master.rb', line 142 def agent(agent_id) registered_agents[agent_id] || unregistered_agents[agent_id] end |
#dump_status ⇒ Object
207 208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/wakame/master.rb', line 207 def dump_status ag = [] res = {:registered=>[], :unregistered=>[]} @registered_agents.each { |key, a| res[:registered] << a.dump_status } @unregistered_agents.each { |key, a| res[:unregistered] << a.dump_status } res end |
#each_online(&blk) ⇒ Object
200 201 202 203 204 205 |
# File 'lib/wakame/master.rb', line 200 def each_online(&blk) registered_agents.each { |k, v| next if v.status != Service::Agent::STATUS_ONLINE blk.call(v) } end |
#master_local ⇒ Object
Retruns the master local agent object
193 194 195 196 197 198 |
# File 'lib/wakame/master.rb', line 193 def master_local agent = registered_agents[@master.master_local_agent_id] puts "#{agent} = registered_agents[#{@master.master_local_agent_id}]" raise "Master does not identify the master local agent yet." if agent.nil? agent end |
#register_agent(data) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/wakame/master.rb', line 146 def register_agent(data) agent_id = data[:agent_id] agent = registered_agents[agent_id] if agent.nil? agent = unregistered_agents[agent_id] if agent.nil? # The agent is going to be registered at first time. agent = Service::Agent.new(agent_id) registered_agents[agent_id] = agent else # Move the reference from unregistered group to the registered group. registered_agents[agent_id] = unregistered_agents[agent_id] unregistered_agents.delete(agent_id) end Wakame.log.debug("The Agent has been registered: #{data.inspect}") #Wakame.log.debug(unregistered_agents) ED.fire_event(Event::AgentMonitored.new(agent)) end agent.root_path = data[:root_path] end |
#unregister_agent(agent_id) ⇒ Object
167 168 169 170 171 172 173 174 |
# File 'lib/wakame/master.rb', line 167 def unregister_agent(agent_id) agent = registered_agents[agent_id] if agent unregistered_agents[agent_id] = registered_agents[agent_id] registered_agents.delete(agent_id) ED.fire_event(Event::AgentUnMonitored.new(agent)) end end |