Class: GrpcKit::Stream::ServerStream
- Inherits:
-
Object
- Object
- GrpcKit::Stream::ServerStream
- Defined in:
- lib/grpc_kit/stream/server_stream.rb
Instance Method Summary collapse
-
#initialize(transport) ⇒ ServerStream
constructor
A new instance of ServerStream.
- #invoke(rpc) ⇒ void
-
#recv_msg(codec, last: false, limit_size: nil) ⇒ Object
This method is not thread safe, never call from multiple threads at once.
- #send_msg(data, codec, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) ⇒ void
- #send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) ⇒ void
Constructor Details
#initialize(transport) ⇒ ServerStream
Returns a new instance of ServerStream.
9 10 11 12 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 9 def initialize(transport) @transport = transport @started = false end |
Instance Method Details
#invoke(rpc) ⇒ void
This method returns an undefined value.
15 16 17 18 19 20 21 22 23 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 15 def invoke(rpc) rpc.invoke(self, metadata: @transport.recv_headers.) rescue GrpcKit::Errors::BadStatus => e GrpcKit.logger.warn(e) send_status(status: e.code, msg: e.reason, metadata: {}) # TODO: metadata should be set rescue StandardError => e GrpcKit.logger.warn(e) send_status(status: GrpcKit::StatusCodes::UNKNOWN, msg: e., metadata: {}) end |
#recv_msg(codec, last: false, limit_size: nil) ⇒ Object
This method is not thread safe, never call from multiple threads at once.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 59 def recv_msg(codec, last: false, limit_size: nil) data = @transport.read_data(last: last) raise StopIteration if data.nil? compressed, size, buf = *data unless size == buf.size raise "inconsistent data: #{buf}" end if limit_size && size > limit_size raise GrpcKit::Errors::ResourceExhausted, "Receving message is too large: recevied=#{size}, max=#{limit_size}" end if compressed raise 'compress option is unsupported' end begin codec.decode(buf) rescue ArgumentError => e raise GrpcKit::Errors::Internal, "Error while decoding in server: #{e}" end end |
#send_msg(data, codec, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) ⇒ void
This method returns an undefined value.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 32 def send_msg(data, codec, last: false, limit_size: nil, initial_metadata: {}, trailing_metadata: {}) buf = begin codec.encode(data) rescue ArgumentError, TypeError => e raise GrpcKit::Errors::Internal, "Error while encoding in server: #{e}" end if limit_size && buf.bytesize > limit_size raise GrpcKit::Errors::ResourceExhausted, "Sending message is too large: send=#{req.bytesize}, max=#{limit_size}" end if last send_status(data: buf, metadata: ) elsif @started @transport.write_data(buf) else start_response(buf, metadata: ) end end |
#send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) ⇒ void
This method returns an undefined value.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/grpc_kit/stream/server_stream.rb', line 89 def send_status(data: nil, status: GrpcKit::StatusCodes::OK, msg: nil, metadata: {}) t = build_trailers(status, msg, ) @transport.write_data(data, last: true) if data if @started # Complete stream @transport.write_trailers(t) @transport.end_write elsif data # Complete stream with a data @transport.write_trailers(t) @transport.end_write start_response # will send queued data and trailer. else # return status (likely non-200) and immediately complete stream. @transport.end_write send_headers(trailers: t) end end |