Class: Protobuf::Rpc::SocketServer

Inherits:
Object
  • Object
show all
Includes:
Logger::LogMethods, Server
Defined in:
lib/protobuf/rpc/servers/socket_server.rb

Defined Under Namespace

Classes: Worker

Class Method Summary collapse

Methods included from Server

#handle_client, #handle_error, #invoke_rpc_method, #log_signature, #parse_request_from_buffer, #parse_response_from_service, #parse_service_info, #send_response, #serialize_response

Class Method Details

.cleanup?Boolean

Returns:

  • (Boolean)


10
11
12
13
# File 'lib/protobuf/rpc/servers/socket_server.rb', line 10

def self.cleanup? 
  # every 10 connections run a cleanup routine after closing the response
  @threads.size > (@thread_threshold - 1) && (@threads.size % @thread_threshold) == 0
end

.cleanup_threadsObject



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/protobuf/rpc/servers/socket_server.rb', line 15

def self.cleanup_threads
  log_debug "[#{log_signature}] Thread cleanup - #{@threads.size} - start"

  @threads = @threads.select do |t| 
    if t[:thread].alive? 
      true  
    else
      t[:thread].join
      @working.delete(t[:socket])
      false
    end
  end

  log_debug "[#{log_signature}] Thread cleanup - #{@threads.size} - complete"
end

.log_signatureObject



31
32
33
# File 'lib/protobuf/rpc/servers/socket_server.rb', line 31

def self.log_signature
  @log_signature ||= "server-#{self}"
end

.new_worker(socket) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/protobuf/rpc/servers/socket_server.rb', line 35

def self.new_worker(socket)
  Thread.new(socket) do |sock|
    Protobuf::Rpc::SocketServer::Worker.new(sock) do |s|
      s.close
    end
  end
end

.run(opts = {}) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/protobuf/rpc/servers/socket_server.rb', line 43

def self.run(opts = {})
  log_debug "[#{log_signature}] Run"
  host = opts.fetch(:host, "127.0.0.1")
  port = opts.fetch(:port, 9399)
  backlog = opts.fetch(:backlog, 100)
  thread_threshold = opts.fetch(:thread_threshold, 100)
  auto_collect_timeout = opts.fetch(:auto_collect_timeout, 20)

  @running = true
  @threads = []
  @thread_threshold = thread_threshold
  @server = TCPServer.new(host, port)
  @server.listen(backlog)
  @working = []
  @listen_fds = [@server]

  while running?
    log_debug "[#{log_signature}] Waiting for connections"

    if ready_cnxns = IO.select(@listen_fds, [], [], auto_collect_timeout)
      cnxns = ready_cnxns.first
      cnxns.each do |client|
        case 
        when !running? then
          # no-op
        when client == @server then 
          log_debug "[#{log_signature}] Accepted new connection"
          client, sockaddr = @server.accept
          @listen_fds << client
        else 
          if !@working.include?(client)
            @working << @listen_fds.delete(client)
            log_debug "[#{log_signature}] Working" 
            @threads << { :thread => new_worker(client), :socket => client }

            cleanup_threads if cleanup?
          end
        end
      end
    else
      # Run a cleanup if select times out while waiting
      cleanup_threads if @threads.size > 1
    end
  end

rescue 
  # Closing the server causes the loop to raise an exception here
  raise if running?
end

.running?Boolean

Returns:

  • (Boolean)


93
94
95
# File 'lib/protobuf/rpc/servers/socket_server.rb', line 93

def self.running?
  @running
end

.stopObject



97
98
99
100
# File 'lib/protobuf/rpc/servers/socket_server.rb', line 97

def self.stop 
  @running = false
  @server.close if @server
end