Class: Wakame::AgentMonitor

Inherits:
Object
  • Object
show all
Includes:
ThreadImmutable
Defined in:
lib/wakame/master.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ThreadImmutable

#bind_thread, included, #target_thread, #target_thread?, #thread_check

Constructor Details

#initialize(master) ⇒ AgentMonitor

Returns a new instance of AgentMonitor.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/wakame/master.rb', line 18

def initialize(master)
  bind_thread
  @master = master
  @registered_agents = {}
  @unregistered_agents = {}
  @agent_timeout = 31.to_f
  @agent_kill_timeout = @agent_timeout * 2
  @gc_period = 20.to_f

  # GC event trigger for agent timer & status
  calc_agent_timeout = proc {
    #Wakame.log.debug("Started agent GC : agents.size=#{@registered_agents.size}")
    kill_list=[]
    registered_agents.each { |agent_id, agent|
      next if agent.status == Service::Agent::STATUS_OFFLINE
      diff_time = Time.now - agent.last_ping_at
      #Wakame.log.debug "AgentMonitor GC : #{agent_id}: #{diff_time}"
      if diff_time > @agent_timeout.to_f
        agent.status = Service::Agent::STATUS_TIMEOUT
      end
      
      if diff_time > @agent_kill_timeout.to_f
        kill_list << agent_id
      end
    }
    
    kill_list.each { |agent_id|
      agent = @agents.delete(agent_id)
      ED.fire_event(Event::AgentUnMonitored.new(agent)) unless agent.nil?
    }
    #Wakame.log.debug("Finished agent GC")
  }
  
  @agent_timeout_timer = EventMachine::PeriodicTimer.new(@gc_period, calc_agent_timeout)
  
  master.add_subscriber('registry') { |data|
    data = eval(data)

    agent_id = data[:agent_id]
    case data[:type]
    when 'Wakame::Packets::Register'
      register_agent(data)
    when 'Wakame::Packets::UnRegister'
      unregister_agent(agent_id)
    end
  }

  master.add_subscriber('ping') { |data|
    ping = eval(data)
    # Skip the old ping responses before starting master node.
    next if Time.parse(ping[:responded_at]) < master.started_at

    # Variable update function for the common members
    set_report_values = proc { |agent|
      agent.status = Service::Agent::STATUS_ONLINE
      agent.uptime = 0
      agent.last_ping_at = Time.parse(ping[:responded_at])
      
      agent.attr = ping[:attrs]
      
      agent.services.clear
      ping.services.each { |svc_id, i|
        agent.services[svc_id] = master.service_cluster.instances[svc_id]
      }
    }
    
    agent = agent(ping[:agent_id])
    if agent.nil?
      agent = Service::Agent.new(ping[:agent_id])
      
      set_report_values.call(agent)
      
      unregistered_agents[ping[:agent_id]]=agent
    else
      set_report_values.call(agent)
    end
    
    
    ED.fire_event(Event::AgentPong.new(agent))
  }
  
  master.add_subscriber('agent_event') { |data|
    response = eval(data)
#p response
    case response[:type]
    when 'Wakame::Packets::ServiceStatusChanged'
      svc_inst = Service::ServiceInstance.instance_collection[response[:svc_id]]
      if svc_inst
        response_time = Time.parse(response[:responded_at])
        svc_inst.update_status(response[:new_status], response_time, response[:fail_message])
        
#             tmp_event = Event::ServiceStatusChanged.new(response[:svc_id], svc_inst.property, response[:status], response[:previous_status])
#             tmp_event.time = response_time
#             ED.fire_event(tmp_event)
        
#             if response[:previous_status] != Service::STATUS_ONLINE && response[:new_status] == Service::STATUS_ONLINE
#               tmp_event = Event::ServiceOnline.new(tmp_event.instance_id, svc_inst.property)
#               tmp_event.time = response_time
#               ED.fire_event(tmp_event)
#             elsif response[:previous_status] != Service::STATUS_OFFLINE && response[:new_status] == Service::STATUS_OFFLINE
#               tmp_event = Event::ServiceOffline.new(tmp_event.instance_id, svc_inst.property)
#               tmp_event.time = response_time
#               ED.fire_event(tmp_event)
#             elsif response[:previous_status] != Service::STATUS_FAIL && response[:new_status] == Service::STATUS_FAIL
#               tmp_event = Event::ServiceFailed.new(tmp_event.instance_id, svc_inst.property, response[:fail_message])
#               tmp_event.time = response_time
#               ED.fire_event(tmp_event)
#             end
      end
    when 'Wakame::Packets::ActorResponse'
      case response[:status]
      when Actor::STATUS_RUNNING
          ED.fire_event(Event::ActorProgress.new(response[:agent_id], response[:token], 0))
      else
          ED.fire_event(Event::ActorComplete.new(response[:agent_id], response[:token], response[:status]))
      end
    else
      Wakame.log.warn("#{self.class}: Unhandled agent response: #{response[:type]}")
    end
  }

end

Instance Attribute Details

#gc_periodObject (readonly)

Returns the value of attribute gc_period.



15
16
17
# File 'lib/wakame/master.rb', line 15

def gc_period
  @gc_period
end

#masterObject (readonly)

Returns the value of attribute master.



15
16
17
# File 'lib/wakame/master.rb', line 15

def master
  @master
end

#registered_agentsObject (readonly)

Returns the value of attribute registered_agents.



15
16
17
# File 'lib/wakame/master.rb', line 15

def registered_agents
  @registered_agents
end

#unregistered_agentsObject (readonly)

Returns the value of attribute unregistered_agents.



15
16
17
# File 'lib/wakame/master.rb', line 15

def unregistered_agents
  @unregistered_agents
end

Instance Method Details

#agent(agent_id) ⇒ Object



142
143
144
# File 'lib/wakame/master.rb', line 142

def agent(agent_id)
  registered_agents[agent_id] || unregistered_agents[agent_id]
end

#dump_statusObject



207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/wakame/master.rb', line 207

def dump_status
  ag = []
  res = {:registered=>[], :unregistered=>[]}
  
  @registered_agents.each { |key, a|
    res[:registered] << a.dump_status
  }
  @unregistered_agents.each { |key, a|
    res[:unregistered] << a.dump_status
  }
  res
end

#each_online(&blk) ⇒ Object



200
201
202
203
204
205
# File 'lib/wakame/master.rb', line 200

def each_online(&blk)
  registered_agents.each { |k, v|
    next if v.status != Service::Agent::STATUS_ONLINE
    blk.call(v)
  }
end

#master_localObject

Retruns the master local agent object



193
194
195
196
197
198
# File 'lib/wakame/master.rb', line 193

def master_local
  agent = registered_agents[@master.master_local_agent_id]
  puts "#{agent} = registered_agents[#{@master.master_local_agent_id}]"
  raise "Master does not identify the master local agent yet." if agent.nil?
  agent
end

#register_agent(data) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/wakame/master.rb', line 146

def register_agent(data)
  agent_id = data[:agent_id]
  agent = registered_agents[agent_id]
  if agent.nil?
    agent = unregistered_agents[agent_id]
    if agent.nil?
      # The agent is going to be registered at first time.
      agent = Service::Agent.new(agent_id)
      registered_agents[agent_id] = agent
    else
      # Move the reference from unregistered group to the registered group.
      registered_agents[agent_id] = unregistered_agents[agent_id]
      unregistered_agents.delete(agent_id)
    end
    Wakame.log.debug("The Agent has been registered: #{data.inspect}")
    #Wakame.log.debug(unregistered_agents)
    ED.fire_event(Event::AgentMonitored.new(agent))
  end
  agent.root_path = data[:root_path]
end

#unregister_agent(agent_id) ⇒ Object



167
168
169
170
171
172
173
174
# File 'lib/wakame/master.rb', line 167

def unregister_agent(agent_id)
  agent = registered_agents[agent_id]
  if agent
    unregistered_agents[agent_id] = registered_agents[agent_id]
    registered_agents.delete(agent_id)
    ED.fire_event(Event::AgentUnMonitored.new(agent))
  end
end