Class: GRPC::RpcServer
- Inherits:
-
Object
- Object
- GRPC::RpcServer
- Extended by:
- Forwardable
- Includes:
- Core::CallOps, Core::TimeConsts
- Defined in:
- src/ruby/lib/grpc/generic/rpc_server.rb
Overview
RpcServer hosts a number of services and makes them available on the network.
Constant Summary collapse
- DEFAULT_POOL_SIZE =
Default thread pool size is 30
30
- DEFAULT_MAX_WAITING_REQUESTS =
Deprecated due to internal changes to the thread pool
20
- DEFAULT_POLL_PERIOD =
Default poll period is 1s
1
- SIGNAL_CHECK_PERIOD =
Signal check period is 0.25s
0.25
Class Method Summary collapse
-
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
Instance Method Summary collapse
-
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs.
-
#handle(service) ⇒ Object
handle registration of classes.
-
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server.
-
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE, connect_md_proc: nil, server_args: {}, interceptors: []) ⇒ RpcServer
constructor
Creates a new RpcServer.
-
#loop_handle_server_calls ⇒ Object
handles calls to the server.
- #new_active_server_call(an_rpc) ⇒ Object
-
#run ⇒ Object
(also: #run_till_terminated)
runs the server.
-
#run_till_terminated_or_interrupted(signals, wait_interval = 60) ⇒ Object
runs the server with signal handlers.
- #running? ⇒ Boolean
- #running_state ⇒ Object
-
#stop ⇒ Object
stops a running server.
- #stopped? ⇒ Boolean
-
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex.
-
#wait_till_running(timeout = nil) ⇒ true, false
Is called from other threads to wait for #run to start up the server.
Methods included from Core::TimeConsts
Constructor Details
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE, connect_md_proc: nil, server_args: {}, interceptors: []) ⇒ RpcServer
Creates a new RpcServer.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer instance.
-
pool_size: the size of the thread pool the server uses to run its
threads. No more concurrent requests can be made than the size of the thread pool
-
max_waiting_requests: Deprecated due to internal changes to the thread
pool. This is still an argument for compatibility but is ignored.
-
poll_period: The amount of time in seconds to wait for
currently-serviced RPC’s to finish before cancelling them when shutting down the server.
-
pool_keep_alive: The amount of time in seconds to wait
for currently busy thread-pool threads to finish before forcing an abrupt exit to each thread.
-
connect_md_proc:
when non-nil is a proc for determining metadata to send back the client on receiving an invocation req. The proc signature is:
{key: val, ..} func(method_name, {key: val, ...})
-
server_args:
A server arguments hash to be passed down to the underlying core server
-
interceptors:
An array of GRPC::ServerInterceptor objects that will be used for intercepting server handlers to provide extra functionality. Interceptors are an EXPERIMENTAL API.
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 217 def initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE, connect_md_proc: nil, server_args: {}, interceptors: []) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = Core::Server.new(server_args) @interceptors = InterceptorRegistry.new(interceptors) end |
Class Method Details
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
176 177 178 179 180 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 176 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end |
Instance Method Details
#available?(an_rpc) ⇒ Boolean
Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
418 419 420 421 422 423 424 425 426 427 428 429 430 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 418 def available?(an_rpc) return an_rpc if @pool.ready_for_work? GRPC.logger.warn('no free worker threads currently') noop = proc { |x| x } # Create a new active call that knows that metadata hasn't been # sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, 'No free threads in thread pool') nil end |
#handle(service) ⇒ Object
handle registration of classes
service is either a class that includes GRPC::GenericService and whose #new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService
rpc :div DivArgs, DivReply # single request, single response
def initialize(optional_arg='default option') # no args
...
end
srv = GRPC::RpcServer.new(…)
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new(‘replace optional arg’))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
333 334 335 336 337 338 339 340 341 342 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 333 def handle(service) @run_mutex.synchronize do unless @running_state == :not_started fail 'cannot add services if the server has been started' end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end end |
#implemented?(an_rpc) ⇒ Boolean
Sends UNIMPLEMENTED if the method is not implemented by this server
433 434 435 436 437 438 439 440 441 442 443 444 445 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 433 def implemented?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } # Create a new active call that knows that # metadata hasn't been sent yet c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, metadata_received: true, started: false) c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') nil end |
#loop_handle_server_calls ⇒ Object
handles calls to the server
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 448 def loop_handle_server_calls fail 'not started' if running_state == :not_started while running_state == :running begin an_rpc = @server.request_call break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac begin rpc_descs[mth].run_server_method( c, rpc_handlers[mth], @interceptors.build_context ) rescue StandardError c.send_status(GRPC::Core::StatusCodes::INTERNAL, 'Server handler failed') end end end rescue Core::CallError, RuntimeError => e # these might happen for various reasons. The correct behavior of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") end next end end # @running_state should be :stopping here @run_mutex.synchronize do transition_running_state(:stopped) GRPC.logger.info("stopped: #{self}") @server.close end end |
#new_active_server_call(an_rpc) ⇒ Object
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 487 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call an_rpc.call. = an_rpc. # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.) end return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) # Create the ActiveCall. Indicate that metadata hasnt been sent yet. GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] c = ActiveCall.new(an_rpc.call, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline, metadata_received: true, started: false, metadata_to_send: connect_md) c.attach_peer_cert(an_rpc.call.peer_cert) mth = an_rpc.method.to_sym [c, mth] end |
#run ⇒ Object Also known as: run_till_terminated
runs the server
-
if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.
-
#running? returns true after this is called, until #stop cause the the server to stop.
351 352 353 354 355 356 357 358 359 360 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 351 def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls end |
#run_till_terminated_or_interrupted(signals, wait_interval = 60) ⇒ Object
runs the server with signal handlers
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 371 def run_till_terminated_or_interrupted(signals, wait_interval = 60) @stop_server = false @stop_server_mu = Mutex.new @stop_server_cv = ConditionVariable.new @stop_server_thread = Thread.new do loop do break if @stop_server @stop_server_mu.synchronize do @stop_server_cv.wait(@stop_server_mu, wait_interval) end end # stop is surrounded by mutex, should handle multiple calls to stop # correctly stop end valid_signals = Signal.list # register signal handlers signals.each do |sig| # input validation target_sig = if sig.class == String # cut out the SIG prefix to see if valid signal sig.upcase.start_with?('SIG') ? sig.upcase[3..-1] : sig.upcase else sig end # register signal traps for all valid signals if valid_signals.value?(target_sig) || valid_signals.key?(target_sig) Signal.trap(target_sig) do @stop_server = true @stop_server_cv.broadcast end else fail "#{target_sig} not a valid signal" end end run @stop_server_thread.join end |
#running? ⇒ Boolean
280 281 282 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 280 def running? running_state == :running end |
#running_state ⇒ Object
260 261 262 263 264 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 260 def running_state @run_mutex.synchronize do return @running_state end end |
#stop ⇒ Object
stops a running server
the call has no impact if the server is already stopped, otherwise server’s current call loop is it’s last.
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 242 def stop # if called via run_till_terminated_or_interrupted, # signal stop_server_thread and don't do anything if @stop_server.nil? == false && @stop_server == false @stop_server = true @stop_server_cv.broadcast return end @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) deadline = from_relative_time(@poll_period) @server.shutdown_and_notify(deadline) end @pool.stop end |
#stopped? ⇒ Boolean
284 285 286 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 284 def stopped? running_state == :stopped end |
#transition_running_state(target_state) ⇒ Object
Can only be called while holding @run_mutex
267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 267 def transition_running_state(target_state) state_transitions = { not_started: :running, running: :stopping, stopping: :stopped } if state_transitions[@running_state] == target_state @running_state = target_state else fail "Bad server state transition: #{@running_state}->#{target_state}" end end |
#wait_till_running(timeout = nil) ⇒ true, false
Is called from other threads to wait for #run to start up the server.
If run has not been called, this returns immediately.
294 295 296 297 298 299 |
# File 'src/ruby/lib/grpc/generic/rpc_server.rb', line 294 def wait_till_running(timeout = nil) @run_mutex.synchronize do @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started return @running_state == :running end end |