Class: Wakame::MasterManagers::AgentMonitor
Instance Attribute Summary
#master
Instance Method Summary
collapse
#bind_thread, included, #target_thread, #target_thread?, #thread_check
#reload, #start, #stop
Instance Method Details
#agent_pool ⇒ Object
145
146
147
|
# File 'lib/wakame/master_managers/agent_monitor.rb', line 145
def agent_pool
Models::AgentPool.instance
end
|
#init ⇒ Object
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/wakame/master_managers/agent_monitor.rb', line 8
def init
@agent_timeout = 301.to_f
@agent_kill_timeout = @agent_timeout * 2
@gc_period = 20.to_f
@agent_timeout_timer = EM::PeriodicTimer.new(@gc_period) {
StatusDB.pass {
self.agent_pool.dataset.all.each { |row|
agent = Service::Agent.find(row[:agent_id])
diff_time = Time.now - agent.last_ping_at_time
if diff_time > @agent_timeout.to_f
agent.update_monitor_status(Service::Agent::STATUS_TIMEOUT)
end
if diff_time > @agent_kill_timeout.to_f
agent_pool.unregister(agent)
end
}
}
}
master.add_subscriber('registry') { |data|
data = eval(data)
next if Time.parse(data[:responded_at]) < master.started_at
StatusDB.pass {
agent_id = data[:agent_id]
agent = agent_pool.agent_find_or_create(agent_id)
case data[:class_type]
when 'Wakame::Packets::Register'
agent.update_status(Service::Agent::STATUS_REGISTERRING)
agent_pool.register_as_observed(agent)
agent.root_path = data[:root_path]
agent.save
master.action_manager.trigger_action(Actions::RegisterAgent.new(agent))
when 'Wakame::Packets::UnRegister'
agent_pool.unregister(agent)
end
}
}
master.add_subscriber('ping') { |data|
ping = eval(data)
next if Time.parse(ping[:responded_at]) < master.started_at
set_report_values = proc { |agent|
agent.last_ping_at = ping[:responded_at]
agent.renew_reported_services(ping[:services])
agent.save
agent.update_monitor_status(Service::Agent::STATUS_ONLINE)
}
StatusDB.pass {
agent = Service::Agent.find(ping[:agent_id])
if agent.nil?
agent = Service::Agent.new
agent.id = ping[:agent_id]
set_report_values.call(agent)
agent_pool.register_as_observed(agent)
else
set_report_values.call(agent)
end
EventDispatcher.fire_event(Event::AgentPong.new(agent))
}
}
master.add_subscriber('agent_event') { |data|
response = eval(data)
next if Time.parse(response[:responded_at]) < master.started_at
case response[:class_type]
when 'Wakame::Packets::StatusCheckResult'
StatusDB.pass {
svc_inst = Service::ServiceInstance.find(response[:svc_id])
if svc_inst
svc_inst.monitor_status = response[:status]
svc_inst.save
else
Wakame.log.error("#{self.class}: Unknown service ID: #{response[:svc_id]}")
agent = Service::Agent.find(response[:agent_id])
correct_svc_monitor_mismatch(agent)
end
}
when 'Wakame::Packets::ServiceStatusChanged'
StatusDB.pass {
svc_inst = Service::ServiceInstance.find(response[:svc_id])
if svc_inst
response_time = Time.parse(response[:responded_at])
svc_inst.update_monitor_status(response[:new_status], response_time, response[:fail_message])
end
}
when 'Wakame::Packets::ActorResponse'
case response[:status]
when Actor::STATUS_RUNNING
EventDispatcher.fire_event(Event::ActorProgress.new(response[:agent_id], response[:token], 0))
when Actor::STATUS_FAILED
EventDispatcher.fire_event(Event::ActorComplete.new(response[:agent_id], response[:token], response[:status], nil))
else
EventDispatcher.fire_event(Event::ActorComplete.new(response[:agent_id], response[:token], response[:status], response[:opts][:return_value]))
end
else
Wakame.log.warn("#{self.class}: Unhandled agent response: #{response[:class_type]}")
end
}
EventDispatcher.subscribe(Event::AgentUnMonitored) { |event|
StatusDB.pass {
agent = Service::Agent.find(event.agent.id)
agent.terminate
}
}
end
|
#terminate ⇒ Object
141
142
143
|
# File 'lib/wakame/master_managers/agent_monitor.rb', line 141
def terminate
@agent_timeout_timer.cancel
end
|