Class: GRPC::RpcServer
- Inherits:
-
Object
- Object
- GRPC::RpcServer
- Extended by:
- Forwardable
- Includes:
- Core::CallOps, Core::TimeConsts
- Defined in:
- lib/grpc/generic/rpc_server.rb
Overview
RpcServer hosts a number of services and makes them available on the network.
Constant Summary collapse
- DEFAULT_POOL_SIZE =
Default thread pool size is 3
3
- DEFAULT_MAX_WAITING_REQUESTS =
Default max_waiting_requests size is 20
20
- DEFAULT_POLL_PERIOD =
Default poll period is 1s
1
- SIGNAL_CHECK_PERIOD =
Signal check period is 0.25s
0.25
Constants included from Core::CallOps
Core::CallOps::RECV_CLOSE_ON_SERVER, Core::CallOps::RECV_INITIAL_METADATA, Core::CallOps::RECV_MESSAGE, Core::CallOps::RECV_STATUS_ON_CLIENT, Core::CallOps::SEND_CLOSE_FROM_CLIENT, Core::CallOps::SEND_INITIAL_METADATA, Core::CallOps::SEND_MESSAGE, Core::CallOps::SEND_STATUS_FROM_SERVER
Class Method Summary collapse
-
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
-
.setup_cq(alt_cq) ⇒ Object
setup_cq is used by #initialize to constuct a Core::CompletionQueue from its arguments.
-
.setup_srv(alt_srv, cq, **kw) ⇒ Object
setup_srv is used by #initialize to constuct a Core::Server from its arguments.
Instance Method Summary collapse
-
#available?(an_rpc) ⇒ Boolean
Sends UNAVAILABLE if there are too many unprocessed jobs.
-
#found?(an_rpc) ⇒ Boolean
Sends NOT_FOUND if the method can’t be found.
-
#handle(service) ⇒ Object
handle registration of classes.
-
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, completion_queue_override: nil, server_override: nil, connect_md_proc: nil, **kw) ⇒ RpcServer
constructor
Creates a new RpcServer.
-
#loop_handle_server_calls ⇒ Object
handles calls to the server.
- #new_active_server_call(an_rpc) ⇒ Object
-
#run ⇒ Object
runs the server.
-
#run_till_terminated ⇒ Object
Runs the server in its own thread, then waits for signal INT or TERM on the current thread to terminate it.
-
#running? ⇒ Boolean
determines if the server is currently running.
-
#stop ⇒ Object
stops a running server.
-
#stopped? ⇒ Boolean
Determines if the server is currently stopped.
-
#wait_till_running(timeout = 0.1) ⇒ Object
Is called from other threads to wait for #run to start up the server.
Methods included from Core::TimeConsts
Constructor Details
#initialize(pool_size: DEFAULT_POOL_SIZE, max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, poll_period: DEFAULT_POLL_PERIOD, completion_queue_override: nil, server_override: nil, connect_md_proc: nil, **kw) ⇒ RpcServer
Creates a new RpcServer.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer instance, however other arbitrary are allowed and when present are used to configure the listeninng connection set up by the RpcServer.
-
server_override: which if passed must be a [GRPC::Core::Server]. When
present.
-
poll_period: when present, the server polls for new events with this
period
-
pool_size: the size of the thread pool the server uses to run its
threads
-
completion_queue_override: when supplied, this will be used as the
completion_queue that the server uses to receive network events, otherwise its creates a new instance itself
-
creds: [GRPC::Core::ServerCredentials]
the credentials used to secure the server
-
max_waiting_requests: the maximum number of requests that are not
being handled to allow. When this limit is exceeded, the server responds with not available to new requests
-
connect_md_proc:
when non-nil is a proc for determining metadata to to send back the client on receiving an invocation req. The proc signature is: val, .. func(method_name, val, …)
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
# File 'lib/grpc/generic/rpc_server.rb', line 245 def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, connect_md_proc:nil, **kw) @cq = RpcServer.setup_cq(completion_queue_override) @server = RpcServer.setup_srv(server_override, @cq, **kw) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @pool_size = pool_size @max_waiting_requests = max_waiting_requests @poll_period = poll_period @run_mutex = Mutex.new @run_cond = ConditionVariable.new @pool = Pool.new(@pool_size) end |
Class Method Details
.setup_connect_md_proc(a_proc) ⇒ Object
setup_connect_md_proc is used by #initialize to validate the connect_md_proc.
207 208 209 210 211 |
# File 'lib/grpc/generic/rpc_server.rb', line 207 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end |
.setup_cq(alt_cq) ⇒ Object
setup_cq is used by #initialize to constuct a Core::CompletionQueue from its arguments.
189 190 191 192 193 194 195 |
# File 'lib/grpc/generic/rpc_server.rb', line 189 def self.setup_cq(alt_cq) return Core::CompletionQueue.new if alt_cq.nil? unless alt_cq.is_a? Core::CompletionQueue fail(TypeError, '!CompletionQueue') end alt_cq end |
.setup_srv(alt_srv, cq, **kw) ⇒ Object
setup_srv is used by #initialize to constuct a Core::Server from its arguments.
199 200 201 202 203 |
# File 'lib/grpc/generic/rpc_server.rb', line 199 def self.setup_srv(alt_srv, cq, **kw) return Core::Server.new(cq, kw) if alt_srv.nil? fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server alt_srv end |
Instance Method Details
#available?(an_rpc) ⇒ Boolean
Sends UNAVAILABLE if there are too many unprocessed jobs
382 383 384 385 386 387 388 389 390 391 |
# File 'lib/grpc/generic/rpc_server.rb', line 382 def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests logger.info("waiting: #{jobs_count}, max: #{max}") return an_rpc if @pool.jobs_waiting <= @max_waiting_requests logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::UNAVAILABLE, '') nil end |
#found?(an_rpc) ⇒ Boolean
Sends NOT_FOUND if the method can’t be found
394 395 396 397 398 399 400 401 402 |
# File 'lib/grpc/generic/rpc_server.rb', line 394 def found?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) logger.warn("NOT_FOUND: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::NOT_FOUND, '') nil end |
#handle(service) ⇒ Object
handle registration of classes
service is either a class that includes GRPC::GenericService and whose #new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService
rpc :div DivArgs, DivReply # single request, single response
def initialize(optional_arg='default option') # no args
...
end
srv = GRPC::RpcServer.new(…)
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new(‘replace optional arg’))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
351 352 353 354 355 356 357 |
# File 'lib/grpc/generic/rpc_server.rb', line 351 def handle(service) fail 'cannot add services if the server is running' if running? fail 'cannot add services if the server is stopped' if stopped? cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end |
#loop_handle_server_calls ⇒ Object
handles calls to the server
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 |
# File 'lib/grpc/generic/rpc_server.rb', line 405 def loop_handle_server_calls fail 'not running' unless @running request_call_tag = Object.new until stopped? deadline = from_relative_time(@poll_period) an_rpc = @server.request_call(@cq, request_call_tag, deadline) c = new_active_server_call(an_rpc) unless c.nil? mth = an_rpc.method.to_sym @pool.schedule(c) do |call| rpc_descs[mth].run_server_method(call, rpc_handlers[mth]) end end end end |
#new_active_server_call(an_rpc) ⇒ Object
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 |
# File 'lib/grpc/generic/rpc_server.rb', line 421 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call handle_call_tag = Object.new an_rpc.call. = an_rpc. # attaches md to call for handlers connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.) end an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless found?(an_rpc) # Create the ActiveCall logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] ActiveCall.new(an_rpc.call, @cq, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline) end |
#run ⇒ Object
runs the server
-
if no rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.
-
#running? returns true after this is called, until #stop cause the the server to stop.
366 367 368 369 370 371 372 373 374 375 376 377 378 379 |
# File 'lib/grpc/generic/rpc_server.rb', line 366 def run if rpc_descs.size == 0 logger.warn('did not run as no services were present') return end @run_mutex.synchronize do @running = true @run_cond.signal end @pool.start @server.start loop_handle_server_calls @running = false end |
#run_till_terminated ⇒ Object
Runs the server in its own thread, then waits for signal INT or TERM on the current thread to terminate it.
302 303 304 305 306 307 308 309 310 311 312 |
# File 'lib/grpc/generic/rpc_server.rb', line 302 def run_till_terminated GRPC.trap_signals t = Thread.new { run } wait_till_running loop do sleep SIGNAL_CHECK_PERIOD break unless GRPC.handle_signals end stop t.join end |
#running? ⇒ Boolean
determines if the server is currently running
281 282 283 |
# File 'lib/grpc/generic/rpc_server.rb', line 281 def running? @running ||= false end |
#stop ⇒ Object
stops a running server
the call has no impact if the server is already stopped, otherwise server’s current call loop is it’s last.
267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/grpc/generic/rpc_server.rb', line 267 def stop return unless @running @stopped = true @pool.stop # TODO: uncomment this: # # This segfaults in the c layer, so its commented out for now. Shutdown # still occurs, but the c layer has to do the cleanup. # # @server.close end |
#stopped? ⇒ Boolean
Determines if the server is currently stopped
315 316 317 |
# File 'lib/grpc/generic/rpc_server.rb', line 315 def stopped? @stopped ||= false end |
#wait_till_running(timeout = 0.1) ⇒ Object
Is called from other threads to wait for #run to start up the server.
If run has not been called, this returns immediately.
291 292 293 294 295 296 297 298 |
# File 'lib/grpc/generic/rpc_server.rb', line 291 def wait_till_running(timeout = 0.1) end_time, sleep_period = Time.now + timeout, (1.0 * timeout) / 100 while Time.now < end_time @run_mutex.synchronize { @run_cond.wait(@run_mutex) } unless running? sleep(sleep_period) end running? end |