Class: Pandemic::ServerSide::Server

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/pandemic/server_side/server.rb

Defined Under Namespace

Classes: StopServer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util

#host_port, #logger, #with_mutex

Constructor Details

#initialize(bind_to) ⇒ Server

Returns a new instance of Server.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/pandemic/server_side/server.rb', line 29

def initialize(bind_to)
  write_pid_file
  
  @host, @port = host_port(bind_to)
  @clients = []
  @total_clients = 0
  @clients_mutex = Mutex.new
  @num_jobs_processed = MutexCounter.new
  @num_jobs_entered = MutexCounter.new
  @requests_per_second = RequestsPerSecond.new(10)
  
  @peers = with_mutex({})
  @servers = Config.servers
  @servers.each do |peer|
    next if peer == bind_to # not a peer, it's itself
    @peers[peer] = Peer.new(peer, self)
  end
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



28
29
30
# File 'lib/pandemic/server_side/server.rb', line 28

def host
  @host
end

#portObject (readonly)

Returns the value of attribute port.



28
29
30
# File 'lib/pandemic/server_side/server.rb', line 28

def port
  @port
end

#runningObject (readonly)

Returns the value of attribute running.



28
29
30
# File 'lib/pandemic/server_side/server.rb', line 28

def running
  @running
end

Class Method Details

.boot(bind_to = nil) ⇒ Object



7
8
9
10
11
12
13
# File 'lib/pandemic/server_side/server.rb', line 7

def boot(bind_to = nil)
  Config.load
  # Process.setrlimit(Process::RLIMIT_NOFILE, 4096) # arbitrary high number of max file descriptors.
  server = self.new(bind_to || Config.bind_to)
  set_signal_traps(server)
  server
end

Instance Method Details

#client_closed(client) ⇒ Object



190
191
192
193
194
# File 'lib/pandemic/server_side/server.rb', line 190

def client_closed(client)
  @clients_mutex.synchronize do
    @clients.delete(client)
  end
end

#connection_statusesObject



179
180
181
182
183
184
185
186
187
188
# File 'lib/pandemic/server_side/server.rb', line 179

def connection_statuses
  @servers.inject({}) do |statuses, server|
    if server == signature
      statuses[server] = :self
    else
      statuses[server] = @peers[server].connected? ? :connected : :disconnected
    end
    statuses
  end
end

#handle_client_request(request) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/pandemic/server_side/server.rb', line 132

def handle_client_request(request)
  # info("Handling client request")
  map = @handler_instance.partition(request, connection_statuses)
  request.max_responses = map.size
  # debug("Sending client request to #{map.size} handlers (#{request.hash})")
  
  map.each do |peer, body|
    if @peers[peer]
      @peers[peer].client_request(request, body)
    end
  end
  
  if map[signature]
    # debug("Processing #{request.hash}")
    Thread.new do
      begin
        request.add_response(self.process(map[signature]))
      rescue Exception => e
        warn("Unhandled exception in local processing:\n#{e.inspect}#{e.backtrace.join("\n")}}")
      end
    end
  end
  
  @requests_per_second.hit
  
  # debug("Waiting for responses")
  request.wait_for_responses
  
  # debug("Done waiting for responses, calling reduce")
  @handler_instance.reduce(request)
end

#handle_connection(connection) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/pandemic/server_side/server.rb', line 93

def handle_connection(connection)
  begin
    connection.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) if TCP_NO_DELAY_AVAILABLE
  
    identification = connection.gets.strip
    # info("Incoming connection from #{connection.peeraddr.values_at(3,1).join(":")} (#{identification})")
    if identification =~ /^SERVER ([a-zA-Z0-9.]+:[0-9]+)$/
      # debug("Recognized as peer")
      host, port = host_port($1)
      matching_peer = @peers.values.detect { |peer| [peer.host, peer.port] == [host, port] }
      if matching_peer
        # debug("Found matching peer")
      else
        # debug("Didn't find matching peer, adding it")
        matching_peer = @peers.synchronize do
          hostport = "#{host}:#{port}"
          @servers.push(hostport) unless @servers.include?(hostport)
          @peers[hostport] ||= Peer.new(hostport, self)
        end
      end
      matching_peer.add_incoming_connection(connection)
    elsif identification =~ /^CLIENT$/
      # debug("Recognized as client")
      @clients_mutex.synchronize do
        @clients << Client.new(connection, self).listen
        @total_clients += 1
      end
    elsif identification =~ /^stats$/
      # debug("Stats request received")
      print_stats(connection)
    else
      debug("Unrecognized connection. Closing.")
      connection.close # i dunno you
    end
  rescue Exception => e
    warn("Unhandled exception in handle connection method:\n#{e.inspect}\n#{e.backtrace.join("\n")}")
  end
end

#handler=(handler) ⇒ Object



48
49
50
51
# File 'lib/pandemic/server_side/server.rb', line 48

def handler=(handler)
  @handler = handler
  @handler_instance = handler.new
end

#process(body) ⇒ Object



164
165
166
167
168
169
170
171
172
173
# File 'lib/pandemic/server_side/server.rb', line 164

def process(body)
  @num_jobs_entered.inc
  response = if Config.fork_for_processor
    self.processor.with_connection {|con| con.process(body) } 
  else
    @handler_instance.process(body)
  end
  @num_jobs_processed.inc
  response
end

#processorObject



196
197
198
199
200
201
202
# File 'lib/pandemic/server_side/server.rb', line 196

def processor
  @processor ||= begin
    processor = ConnectionPool.new
    processor.create_connection { Processor.new(@handler) }
    processor
  end
end

#signatureObject



175
176
177
# File 'lib/pandemic/server_side/server.rb', line 175

def signature
  @signature ||= "#{@host}:#{@port}"
end

#startObject



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/pandemic/server_side/server.rb', line 53

def start
  raise "You must specify a handler" unless @handler
  
  @listener = TCPServer.new(@host, @port)
  @running = true
  @running_since = Time.now
  
  # debug("Connecting to peers")
  @peers.values.each { |peer| peer.connect }

  @listener_thread = Thread.new do
    begin
      while @running
        begin
          # debug("Listening")
          conn = @listener.accept
          Thread.new(conn) { |c| handle_connection(c) }
        rescue Errno::ECONNABORTED, Errno::EINTR # TODO: what else can wrong here? this should be more robust.
          debug("Connection accepted aborted")
          conn.close if conn && !conn.closed?
        end
      end
    rescue StopServer
      info("Stopping server")
      remove_pid_file
      @listener.close if @listener
      @peers.values.each { |p| p.disconnect }
      @clients.each {|c| c.close }
      self.processor.disconnect if Config.fork_for_processor
    rescue Exception => e
      warn("Unhandled exception in server listening thread:\n#{e.inspect}\n#{e.backtrace.join("\n")}")
    end
  end
end

#stopObject



88
89
90
91
# File 'lib/pandemic/server_side/server.rb', line 88

def stop
  @running = false
  @listener_thread.raise(StopServer)
end