Class: GrpcKit::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil, settings: [], max_receive_message_size: nil, max_send_message_size: nil) ⇒ Server

Returns a new instance of Server.

Parameters:

  • interceptors (Array<GrpcKit::Grpc::ServerInterceptor>) (defaults to: [])

    list of interceptors

  • shutdown_timeout (Integer) (defaults to: 30)

    Number of seconds to wait for the server shutdown

  • min_pool_size (Integer) (defaults to: nil)

    A mininum thread pool size

  • max_pool_size (Integer) (defaults to: nil)

    A maximum thread pool size

  • max_receive_message_size (Integer, nil) (defaults to: nil)

    Specify the maximum size of inbound message in bytes. Default to 4MB.

  • max_send_message_size (Integer, nil) (defaults to: nil)

    Specify the maximum size of outbound message in bytes. Default to 4MB.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/grpc_kit/server.rb', line 15

def initialize(interceptors: [], shutdown_timeout: 30, min_pool_size: nil, max_pool_size: nil, settings: [], max_receive_message_size: nil, max_send_message_size: nil)
  @interceptors = interceptors
  @shutdown_timeout = shutdown_timeout
  @min_pool_size = min_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MIN
  @max_pool_size = max_pool_size || GrpcKit::RpcDispatcher::DEFAULT_MAX
  @max_receive_message_size = max_receive_message_size
  @max_send_message_size = max_send_message_size
  @sessions = []
  @rpc_descs = {}
  @mutex = Mutex.new
  @stopping = false
  @settings = settings

  GrpcKit.logger.debug("Launched grpc_kit(v#{GrpcKit::VERSION})")
end

Instance Method Details

#force_shutdownvoid

This method returns an undefined value.

This method is expected to be called in trap context



66
67
68
69
70
71
72
73
# File 'lib/grpc_kit/server.rb', line 66

def force_shutdown
  @stopping = true

  Thread.new {
    GrpcKit.logger.debug('force shutdown')
    shutdown_sessions
  }
end

#graceful_shutdown(timeout: true) ⇒ void

This method returns an undefined value.

This method is expected to be called in trap context



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/grpc_kit/server.rb', line 78

def graceful_shutdown(timeout: true)
  @stopping = true

  Thread.new do
    GrpcKit.logger.debug('graceful shutdown')
    @mutex.synchronize { @sessions.each(&:drain) }

    begin
      sec = timeout ? @shutdown_timeout : 0
      Timeout.timeout(sec) do
        sleep 1 until @sessions.empty?
      end
    rescue Timeout::Error => _
      GrpcKit.logger.error("Graceful shutdown is timeout (#{@shutdown_timeout}sec). Perform shutdown forceibly")
      shutdown_sessions
    end
  end
end

#handle(handler) ⇒ void

This method returns an undefined value.

Parameters:



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/grpc_kit/server.rb', line 33

def handle(handler)
  klass = handler.is_a?(Class) ? handler : handler.class
  unless klass.include?(GrpcKit::Grpc::GenericService)
    raise "#{klass} must include Grpc::GenericService"
  end

  klass.rpc_descs.each do |path, rpc_desc|
    if @rpc_descs[path]
      raise "Duplicated method registered #{path}, class: #{klass}"
    end

    @rpc_descs[path] = rpc_desc.build_server(
      handler.is_a?(Class) ? handler.new : handler,
      interceptors: @interceptors,
      max_receive_message_size: @max_receive_message_size,
      max_send_message_size: @max_send_message_size,
    )
  end
end

#run(conn) ⇒ void

This method returns an undefined value.

Parameters:

  • conn (TCPSocket)


55
56
57
58
59
60
61
62
# File 'lib/grpc_kit/server.rb', line 55

def run(conn)
  raise 'Stopping server' if @stopping

  establish_session(conn) do |s|
    s.submit_settings(@settings)
    s.start
  end
end

#session_countObject



97
98
99
# File 'lib/grpc_kit/server.rb', line 97

def session_count
  @mutex.synchronize { @sessions.size }
end