Class: Droonga::Serf
- Inherits:
-
Object
show all
- Includes:
- Loggable
- Defined in:
- lib/droonga/serf.rb,
lib/droonga/serf/tag.rb,
lib/droonga/serf/agent.rb,
lib/droonga/serf/command.rb,
lib/droonga/serf/downloader.rb,
lib/droonga/serf/remote_command.rb
Defined Under Namespace
Modules: RemoteCommand, Tag
Classes: Agent, Command, Downloader
Constant Summary
collapse
- CHECK_RESTARTED_INTERVAL =
3
- CHECK_RESTARTED_TIMEOUT =
60 * 5
Instance Method Summary
collapse
Constructor Details
#initialize(name, options = {}) ⇒ Serf
Returns a new instance of Serf.
35
36
37
38
39
40
41
|
# File 'lib/droonga/serf.rb', line 35
def initialize(name, options={})
@serf_command = nil
@name = NodeName.parse(name)
@verbose = options[:verbose] || false
@service_installation = ServiceInstallation.new
@tags_cache = {}
end
|
Instance Method Details
#accept_messages_newer_than(timestamp) ⇒ Object
212
213
214
215
|
# File 'lib/droonga/serf.rb', line 212
def accept_messages_newer_than(timestamp)
set_tag(Tag.accept_messages_newer_than, timestamp)
end
|
#accept_messages_newer_than_timestamp ⇒ Object
#cluster_id ⇒ Object
217
218
219
220
221
|
# File 'lib/droonga/serf.rb', line 217
def cluster_id
loader = Catalog::Loader.new(Path.catalog.to_s)
catalog = loader.load
catalog.cluster_id
end
|
#current_cluster_state ⇒ Object
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
# File 'lib/droonga/serf.rb', line 120
def current_cluster_state
current_cluster_id = cluster_id
nodes = {}
unprocessed_messages_existence = {}
current_members.each do |member|
foreign = member["tags"][Tag.cluster_id] != current_cluster_id
next if foreign
member["tags"].each do |key, value|
next unless Tag.have_unprocessed_messages_tag?(key)
node_name = Tag.(key)
next if unprocessed_messages_existence[node_name]
unprocessed_messages_existence[node_name] = value == "true"
end
nodes[member["name"]] = {
"type" => member["tags"][Tag.node_type],
"role" => member["tags"][Tag.node_role],
"internal_name" => member["tags"][Tag.internal_node_name],
"accept_messages_newer_than" => member["tags"][Tag.accept_messages_newer_than],
"live" => member["status"] == "alive",
}
end
unprocessed_messages_existence.each do |node_name, have_messages|
nodes[node_name]["have_unprocessed_messages"] = have_messages
end
sorted_nodes = {}
nodes.keys.sort.each do |key|
sorted_nodes[key] = nodes[key]
end
sorted_nodes
end
|
#current_members ⇒ Object
114
115
116
117
118
|
# File 'lib/droonga/serf.rb', line 114
def current_members
raw_response = run_command("members", "-format", "json")
response = JSON.parse(raw_response)
response["members"]
end
|
#delete_tag(name) ⇒ Object
169
170
171
172
|
# File 'lib/droonga/serf.rb', line 169
def delete_tag(name)
run_command("tags", "-delete", name)
@tags_cache.delete(name)
end
|
#ensure_restarted(*nodes, &block) ⇒ Object
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
# File 'lib/droonga/serf.rb', line 226
def ensure_restarted(*nodes, &block)
nodes << @name.to_s if nodes.empty?
targets = nodes.collect do |node|
serf = self.class.new(node)
{
:serf => serf,
:previous_name => serf.get_tag(Tag.internal_node_name),
}
end
start_time = Time.now
yield
while Time.now - start_time < CHECK_RESTARTED_TIMEOUT
puts "Checking nodes are restarted or not:" if @verbose
targets.reject! do |target|
name = target[:serf].get_tag(Tag.internal_node_name)
restarted = name != target[:previous_name]
puts " #{name} vs #{target[:previous_name]} => " +
"#{restarted ? "restarted" : "not yet"}" if @verbose
restarted
end
break if targets.empty?
sleep(CHECK_RESTARTED_INTERVAL)
end
targets.empty?
end
|
#get_tag(name) ⇒ Object
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/droonga/serf.rb', line 153
def get_tag(name)
myself = current_members.find do |member|
member["name"] == @name.to_s
end
if myself
myself["tags"][name]
else
nil
end
end
|
#join(*hosts) ⇒ Object
73
74
75
76
77
78
|
# File 'lib/droonga/serf.rb', line 73
def join(*hosts)
nodes = hosts.collect do |host|
"#{host}:#{agent_port}"
end
run_command("join", *nodes)
end
|
#last_message_timestamp ⇒ Object
198
199
200
201
202
203
204
205
206
|
# File 'lib/droonga/serf.rb', line 198
def last_message_timestamp
response = send_query("report_last_message_timestamp",
"node" => @name.to_s)
if response
response["timestamp"]
else
nil
end
end
|
#leave ⇒ Object
69
70
71
|
# File 'lib/droonga/serf.rb', line 69
def leave
run_command("leave")
end
|
#reset_have_unprocessed_messages_for(node_name) ⇒ Object
#role=(new_role) ⇒ Object
191
192
193
194
195
196
|
# File 'lib/droonga/serf.rb', line 191
def role=(new_role)
role = NodeRole.normalize(new_role)
set_tag(Tag.node_role, role)
role
end
|
#run_agent(loop) ⇒ Object
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/droonga/serf.rb', line 43
def run_agent(loop)
logger.trace("run_agent: start")
ensure_serf
retry_joins = []
detect_other_hosts.each do |other_host|
retry_joins.push("-retry-join", other_host)
end
tags_file = Path.serf_tags_file
FileUtils.mkdir_p(tags_file.dirname)
agent = Agent.new(loop, @serf_command,
@name.host, agent_port, rpc_port,
"-node", @name.to_s,
"-event-handler", "droonga-engine-serf-event-handler",
"-tags-file", tags_file.to_s,
*retry_joins)
agent.start
logger.trace("run_agent: done")
agent
end
|
#send_query(query, payload) ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/droonga/serf.rb', line 80
def send_query(query, payload)
options = ["-format", "json"] + additional_options_from_payload(payload)
options += [query, JSON.generate(payload)]
raw_serf_response = run_command("query", *options)
serf_response = JSON.parse(raw_serf_response)
node = payload["node"]
if node
responses = serf_response["Responses"]
response = responses[node]
if response.is_a?(String)
begin
JSON.parse(response)
rescue JSON::ParserError
response
end
else
response
end
else
response
end
end
|
#set_have_unprocessed_messages_for(node_name) ⇒ Object
178
179
180
181
|
# File 'lib/droonga/serf.rb', line 178
def set_have_unprocessed_messages_for(node_name)
tag = Tag.have_unprocessed_messages_tag_for(node_name)
set_tag(tag, true) unless @tags_cache.key?(tag)
end
|
#set_tag(name, value) ⇒ Object
164
165
166
167
|
# File 'lib/droonga/serf.rb', line 164
def set_tag(name, value)
run_command("tags", "-set", "#{name}=#{value}")
@tags_cache[name] = value
end
|
#update_cluster_id ⇒ Object
174
175
176
|
# File 'lib/droonga/serf.rb', line 174
def update_cluster_id
set_tag(Tag.cluster_id, cluster_id)
end
|
#update_cluster_state ⇒ Object
104
105
106
107
108
109
110
111
112
|
# File 'lib/droonga/serf.rb', line 104
def update_cluster_state
path = Path.cluster_state
new_state = current_cluster_state
file_contents = JSON.pretty_generate(new_state)
SafeFileWriter.write(path) do |output, file|
output.puts(file_contents)
@service_installation.ensure_correct_file_permission(file)
end
end
|