Class: Pandemic::ServerSide::Server
- Inherits:
-
Object
- Object
- Pandemic::ServerSide::Server
show all
- Includes:
- Util
- Defined in:
- lib/pandemic/server_side/server.rb
Defined Under Namespace
Classes: StopServer
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Methods included from Util
#host_port, #logger, #with_mutex
Constructor Details
#initialize(bind_to) ⇒ Server
Returns a new instance of Server.
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# File 'lib/pandemic/server_side/server.rb', line 29
def initialize(bind_to)
write_pid_file
@host, @port = host_port(bind_to)
@clients = []
@total_clients = 0
@clients_mutex = Mutex.new
@num_jobs_processed = MutexCounter.new
@num_jobs_entered = MutexCounter.new
@requests_per_second = RequestsPerSecond.new(10)
@peers = with_mutex({})
@servers = Config.servers
@servers.each do |peer|
next if peer == bind_to @peers[peer] = Peer.new(peer, self)
end
end
|
Instance Attribute Details
#host ⇒ Object
Returns the value of attribute host.
28
29
30
|
# File 'lib/pandemic/server_side/server.rb', line 28
def host
@host
end
|
#port ⇒ Object
Returns the value of attribute port.
28
29
30
|
# File 'lib/pandemic/server_side/server.rb', line 28
def port
@port
end
|
#running ⇒ Object
Returns the value of attribute running.
28
29
30
|
# File 'lib/pandemic/server_side/server.rb', line 28
def running
@running
end
|
Class Method Details
.boot(bind_to = nil) ⇒ Object
7
8
9
10
11
12
13
|
# File 'lib/pandemic/server_side/server.rb', line 7
def boot(bind_to = nil)
Config.load
server = self.new(bind_to || Config.bind_to)
set_signal_traps(server)
server
end
|
Instance Method Details
#client_closed(client) ⇒ Object
190
191
192
193
194
|
# File 'lib/pandemic/server_side/server.rb', line 190
def client_closed(client)
@clients_mutex.synchronize do
@clients.delete(client)
end
end
|
#connection_statuses ⇒ Object
179
180
181
182
183
184
185
186
187
188
|
# File 'lib/pandemic/server_side/server.rb', line 179
def connection_statuses
@servers.inject({}) do |statuses, server|
if server == signature
statuses[server] = :self
else
statuses[server] = @peers[server].connected? ? :connected : :disconnected
end
statuses
end
end
|
#handle_client_request(request) ⇒ Object
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# File 'lib/pandemic/server_side/server.rb', line 132
def handle_client_request(request)
map = @handler_instance.partition(request, connection_statuses)
request.max_responses = map.size
map.each do |peer, body|
if @peers[peer]
@peers[peer].client_request(request, body)
end
end
if map[signature]
Thread.new do
begin
request.add_response(self.process(map[signature]))
rescue Exception => e
warn("Unhandled exception in local processing:\n#{e.inspect}#{e.backtrace.join("\n")}}")
end
end
end
@requests_per_second.hit
request.wait_for_responses
@handler_instance.reduce(request)
end
|
#handle_connection(connection) ⇒ Object
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# File 'lib/pandemic/server_side/server.rb', line 93
def handle_connection(connection)
begin
connection.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) if TCP_NO_DELAY_AVAILABLE
identification = connection.gets.strip
if identification =~ /^SERVER ([a-zA-Z0-9.]+:[0-9]+)$/
host, port = host_port($1)
matching_peer = @peers.values.detect { |peer| [peer.host, peer.port] == [host, port] }
if matching_peer
else
matching_peer = @peers.synchronize do
hostport = "#{host}:#{port}"
@servers.push(hostport) unless @servers.include?(hostport)
@peers[hostport] ||= Peer.new(hostport, self)
end
end
matching_peer.add_incoming_connection(connection)
elsif identification =~ /^CLIENT$/
@clients_mutex.synchronize do
@clients << Client.new(connection, self).listen
@total_clients += 1
end
elsif identification =~ /^stats$/
print_stats(connection)
else
debug("Unrecognized connection. Closing.")
connection.close end
rescue Exception => e
warn("Unhandled exception in handle connection method:\n#{e.inspect}\n#{e.backtrace.join("\n")}")
end
end
|
#handler=(handler) ⇒ Object
48
49
50
51
|
# File 'lib/pandemic/server_side/server.rb', line 48
def handler=(handler)
@handler = handler
@handler_instance = handler.new
end
|
#process(body) ⇒ Object
164
165
166
167
168
169
170
171
172
173
|
# File 'lib/pandemic/server_side/server.rb', line 164
def process(body)
@num_jobs_entered.inc
response = if Config.fork_for_processor
self.processor.with_connection {|con| con.process(body) }
else
@handler_instance.process(body)
end
@num_jobs_processed.inc
response
end
|
#processor ⇒ Object
196
197
198
199
200
201
202
|
# File 'lib/pandemic/server_side/server.rb', line 196
def processor
@processor ||= begin
processor = ConnectionPool.new
processor.create_connection { Processor.new(@handler) }
processor
end
end
|
#signature ⇒ Object
175
176
177
|
# File 'lib/pandemic/server_side/server.rb', line 175
def signature
@signature ||= "#{@host}:#{@port}"
end
|
#start ⇒ Object
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
# File 'lib/pandemic/server_side/server.rb', line 53
def start
raise "You must specify a handler" unless @handler
@listener = TCPServer.new(@host, @port)
@running = true
@running_since = Time.now
@peers.values.each { |peer| peer.connect }
@listener_thread = Thread.new do
begin
while @running
begin
conn = @listener.accept
Thread.new(conn) { |c| handle_connection(c) }
rescue Errno::ECONNABORTED, Errno::EINTR debug("Connection accepted aborted")
conn.close if conn && !conn.closed?
end
end
rescue StopServer
info("Stopping server")
remove_pid_file
@listener.close if @listener
@peers.values.each { |p| p.disconnect }
@clients.each {|c| c.close }
self.processor.disconnect if Config.fork_for_processor
rescue Exception => e
warn("Unhandled exception in server listening thread:\n#{e.inspect}\n#{e.backtrace.join("\n")}")
end
end
end
|
#stop ⇒ Object
88
89
90
91
|
# File 'lib/pandemic/server_side/server.rb', line 88
def stop
@running = false
@listener_thread.raise(StopServer)
end
|