Class: GRPC::RpcServer
- Inherits:
-
Object
- Object
- GRPC::RpcServer
- Extended by:
- Forwardable
- Includes:
- Core::CallOps, Core::TimeConsts
- Defined in:
- lib/grpc/generic/rpc_server.rb
Overview
RpcServer hosts a number of services and makes them available on the network.
Constant Summary collapse
- DEFAULT_POOL_SIZE =
Default thread pool size is 3
3
- DEFAULT_MAX_WAITING_REQUESTS =
Default max_waiting_requests size is 20
20
- DEFAULT_POLL_PERIOD =
Default poll period is 1s
1
- SIGNAL_CHECK_PERIOD =
Signal check period is 0.25s
0.25
Constants included from Core::CallOps
Core::CallOps::RECV_CLOSE_ON_SERVER, Core::CallOps::RECV_INITIAL_METADATA, Core::CallOps::RECV_MESSAGE, Core::CallOps::RECV_STATUS_ON_CLIENT, Core::CallOps::SEND_CLOSE_FROM_CLIENT, Core::CallOps::SEND_INITIAL_METADATA, Core::CallOps::SEND_MESSAGE, Core::CallOps::SEND_STATUS_FROM_SERVER
Class Method Summary collapse
-
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
-
.setup_cq(alt_cq) ⇒ Object
setup_cq is used by #initialize to constuct a Core::CompletionQueue from its arguments.
-
.setup_srv(alt_srv, cq, **kw) ⇒ Object
setup_srv is used by #initialize to constuct a Core::Server from its arguments.
Instance Method Summary collapse
-
#available?(an_rpc) ⇒ Boolean
Sends UNAVAILABLE if there are too many unprocessed jobs.
-
#found?(an_rpc) ⇒ Boolean
Sends NOT_FOUND if the method can’t be found.
-
#handle(service) ⇒ Object
handle registration of classes.
-
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, completion_queue_override: nil, server_override: nil, connect_md_proc: nil, **kw) ⇒ RpcServer
constructor
Creates a new RpcServer.
-
#loop_handle_server_calls ⇒ Object
handles calls to the server.
- #new_active_server_call(an_rpc) ⇒ Object
-
#run ⇒ Object
runs the server.
-
#run_till_terminated ⇒ Object
Runs the server in its own thread, then waits for signal INT or TERM on the current thread to terminate it.
-
#running? ⇒ Boolean
determines if the server is currently running.
-
#stop ⇒ Object
stops a running server.
-
#stopped? ⇒ Boolean
determines if the server has been stopped.
-
#wait_till_running(timeout = 0.1) ⇒ Object
Is called from other threads to wait for #run to start up the server.
Methods included from Core::TimeConsts
Constructor Details
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, completion_queue_override: nil, server_override: nil, connect_md_proc: nil, **kw) ⇒ RpcServer
Creates a new RpcServer.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer instance, however other arbitrary are allowed and when present are used to configure the listeninng connection set up by the RpcServer.
-
server_override: which if passed must be a [GRPC::Core::Server]. When
present.
-
poll_period: when present, the server polls for new events with this
period
-
pool_size: the size of the thread pool the server uses to run its
threads
-
completion_queue_override: when supplied, this will be used as the
completion_queue that the server uses to receive network events, otherwise its creates a new instance itself
-
creds: [GRPC::Core::ServerCredentials]
the credentials used to secure the server
-
max_waiting_requests: the maximum number of requests that are not
being handled to allow. When this limit is exceeded, the server responds with not available to new requests
-
connect_md_proc:
when non-nil is a proc for determining metadata to to send back the client on receiving an invocation req. The proc signature is: val, .. func(method_name, val, …)
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/grpc/generic/rpc_server.rb', line 250 def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, connect_md_proc:nil, **kw) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @cq = RpcServer.setup_cq(completion_queue_override) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new @running = false @server = RpcServer.setup_srv(server_override, @cq, **kw) @stopped = false @stop_mutex = Mutex.new end |
Class Method Details
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
212 213 214 215 216 |
# File 'lib/grpc/generic/rpc_server.rb', line 212 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end |
.setup_cq(alt_cq) ⇒ Object
setup_cq is used by #initialize to constuct a Core::CompletionQueue from its arguments.
194 195 196 197 198 199 200 |
# File 'lib/grpc/generic/rpc_server.rb', line 194 def self.setup_cq(alt_cq) return Core::CompletionQueue.new if alt_cq.nil? unless alt_cq.is_a? Core::CompletionQueue fail(TypeError, '!CompletionQueue') end alt_cq end |
.setup_srv(alt_srv, cq, **kw) ⇒ Object
setup_srv is used by #initialize to constuct a Core::Server from its arguments.
204 205 206 207 208 |
# File 'lib/grpc/generic/rpc_server.rb', line 204 def self.setup_srv(alt_srv, cq, **kw) return Core::Server.new(cq, kw) if alt_srv.nil? fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server alt_srv end |
Instance Method Details
#available?(an_rpc) ⇒ Boolean
Sends UNAVAILABLE if there are too many unprocessed jobs
388 389 390 391 392 393 394 395 396 397 |
# File 'lib/grpc/generic/rpc_server.rb', line 388 def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests GRPC.logger.info("waiting: #{jobs_count}, max: #{max}") return an_rpc if @pool.jobs_waiting <= @max_waiting_requests GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::UNAVAILABLE, '') nil end |
#found?(an_rpc) ⇒ Boolean
Sends NOT_FOUND if the method can’t be found
400 401 402 403 404 405 406 407 408 |
# File 'lib/grpc/generic/rpc_server.rb', line 400 def found?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("NOT_FOUND: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::NOT_FOUND, '') nil end |
#handle(service) ⇒ Object
handle registration of classes
service is either a class that includes GRPC::GenericService and whose #new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService
rpc :div DivArgs, DivReply # single request, single response
def initialize(optional_arg='default option') # no args
...
end
srv = GRPC::RpcServer.new(…)
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new(‘replace optional arg’))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
357 358 359 360 361 362 363 |
# File 'lib/grpc/generic/rpc_server.rb', line 357 def handle(service) fail 'cannot add services if the server is running' if running? fail 'cannot add services if the server is stopped' if stopped? cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end |
#loop_handle_server_calls ⇒ Object
handles calls to the server
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/grpc/generic/rpc_server.rb', line 411 def loop_handle_server_calls fail 'not running' unless @running request_call_tag = Object.new until stopped? deadline = from_relative_time(@poll_period) begin an_rpc = @server.request_call(@cq, request_call_tag, deadline) rescue Core::CallError, RuntimeError => e # can happen during server shutdown GRPC.logger.warn("server call failed: #{e}") next end c = new_active_server_call(an_rpc) unless c.nil? mth = an_rpc.method.to_sym @pool.schedule(c) do |call| rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) end end end end |
#new_active_server_call(an_rpc) ⇒ Object
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
# File 'lib/grpc/generic/rpc_server.rb', line 433 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call handle_call_tag = Object.new an_rpc.call. = an_rpc. # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.) end an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless found?(an_rpc) # Create the ActiveCall GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] ActiveCall.new(an_rpc.call, @cq, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline) end |
#run ⇒ Object
runs the server
-
if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.
-
#running? returns true after this is called, until #stop cause the the server to stop.
372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
# File 'lib/grpc/generic/rpc_server.rb', line 372 def run if rpc_descs.size.zero? GRPC.logger.warn('did not run as no services were present') return end @run_mutex.synchronize do @running = true @run_cond.signal end @pool.start @server.start loop_handle_server_calls @running = false end |
#run_till_terminated ⇒ Object
Runs the server in its own thread, then waits for signal INT or TERM on the current thread to terminate it.
313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/grpc/generic/rpc_server.rb', line 313 def run_till_terminated GRPC.trap_signals t = Thread.new { run } wait_till_running loop do sleep SIGNAL_CHECK_PERIOD break unless GRPC.handle_signals end stop t.join end |
#running? ⇒ Boolean
determines if the server is currently running
292 293 294 |
# File 'lib/grpc/generic/rpc_server.rb', line 292 def running? @running end |
#stop ⇒ Object
stops a running server
the call has no impact if the server is already stopped, otherwise server’s current call loop is it’s last.
275 276 277 278 279 280 281 282 |
# File 'lib/grpc/generic/rpc_server.rb', line 275 def stop return unless @running @stop_mutex.synchronize do @stopped = true end @pool.stop @server.close end |
#stopped? ⇒ Boolean
determines if the server has been stopped
285 286 287 288 289 |
# File 'lib/grpc/generic/rpc_server.rb', line 285 def stopped? @stop_mutex.synchronize do return @stopped end end |
#wait_till_running(timeout = 0.1) ⇒ Object
Is called from other threads to wait for #run to start up the server.
If run has not been called, this returns immediately.
302 303 304 305 306 307 308 309 |
# File 'lib/grpc/generic/rpc_server.rb', line 302 def wait_till_running(timeout = 0.1) end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100 while Time.now < end_time @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running? sleep(sleep_period) end running? end |