Class: Protobuf::Rpc::Socket::Server

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/protobuf/rpc/servers/socket/server.rb

Constant Summary collapse

AUTO_COLLECT_TIMEOUT =

seconds

5

Instance Method Summary collapse

Methods included from Logging

initialize_logger, #log_exception, logger, #logger, logger=, #sign_message

Constructor Details

#initialize(options) ⇒ Server

Returns a new instance of Server.



12
13
14
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 12

def initialize(options)
  @options = options
end

Instance Method Details

#cleanup?Boolean

Returns:

  • (Boolean)


16
17
18
19
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 16

def cleanup?
  # every 10 connections run a cleanup routine after closing the response
  @threads.size > (@threshold - 1) && (@threads.size % @threshold) == 0
end

#cleanup_threadsObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 21

def cleanup_threads
  logger.debug { sign_message("Thread cleanup - #{@threads.size} - start") }

  @threads = @threads.select do |t|
    if t[:thread].alive?
      true
    else
      t[:thread].join
      @working.delete(t[:socket])
      false
    end
  end

  logger.debug { sign_message("Thread cleanup - #{@threads.size} - complete") }
end

#log_signatureObject



37
38
39
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 37

def log_signature
  @_log_signature ||= "server-#{self.class.name}"
end

#new_worker(socket) ⇒ Object



41
42
43
44
45
46
47
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 41

def new_worker(socket)
  Thread.new(socket) do |sock|
    ::Protobuf::Rpc::Socket::Worker.new(sock) do |s|
      s.close
    end
  end
end

#runObject



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 49

def run
  logger.debug { sign_message("Run") }
  host = @options[:host]
  port = @options[:port]
  backlog = @options[:backlog]
  @threshold = @options[:threshold]

  @threads = []
  @server = ::TCPServer.new(host, port)
  raise "The server was unable to start properly." if @server.closed?

  @server.listen(backlog)
  @working = []
  @listen_fds = [@server]
  @running = true

  while running?
    logger.debug { sign_message("Waiting for connections") }
    ready_cnxns = IO.select(@listen_fds, [], [], AUTO_COLLECT_TIMEOUT) rescue nil

    if ready_cnxns
      cnxns = ready_cnxns.first
      cnxns.each do |client|
        case
        when !running? then
          # no-op
        when client == @server then
          logger.debug { sign_message("Accepted new connection") }
          client, sockaddr = @server.accept
          @listen_fds << client
        else
          unless @working.include?(client)
            @working << @listen_fds.delete(client)
            logger.debug { sign_message("Working")  }
            @threads << { :thread => new_worker(client), :socket => client }

            cleanup_threads if cleanup?
          end
        end
      end
    else
      # Run a cleanup if select times out while waiting
      cleanup_threads if @threads.size > 1
    end
  end

rescue Errno::EADDRINUSE
  raise
rescue
  # Closing the server causes the loop to raise an exception here
  raise #if running?
end

#running?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 102

def running?
  !!@running
end

#stopObject



106
107
108
109
# File 'lib/protobuf/rpc/servers/socket/server.rb', line 106

def stop
  @running = false
  @server.try(:close)
end