Class: GrpcKit::Session::ClientSession
- Inherits:
-
DS9::Client
- Object
- DS9::Client
- GrpcKit::Session::ClientSession
- Extended by:
- Forwardable
- Defined in:
- lib/grpc_kit/session/client_session.rb
Defined Under Namespace
Classes: ConnectionClosing
Constant Summary collapse
- MAX_STREAM_ID =
2**31 - 1
Instance Method Summary collapse
-
#initialize(io, **opts) ⇒ ClientSession
constructor
A new instance of ClientSession.
- #run_once ⇒ void
- #send_request(headers) ⇒ void
- #start(stream_id) ⇒ void
Constructor Details
#initialize(io, **opts) ⇒ ClientSession
Returns a new instance of ClientSession.
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/grpc_kit/session/client_session.rb', line 21 def initialize(io, **opts) super() # initialize DS9::Session @io = io @streams = {} @opts = opts @draining = false @stop = false @no_write_data = false @mutex = Mutex.new end |
Instance Method Details
#run_once ⇒ void
This method returns an undefined value.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/grpc_kit/session/client_session.rb', line 67 def run_once @mutex.synchronize do return if @stop if @draining && @drain_time < Time.now raise 'trasport is closing' end if @no_write_data && !@streams.empty? @io.wait_readable if want_read? do_read end else rs, ws = @io.select if !rs.empty? && want_read? do_read end if !ws.empty? && want_write? send end end end end |
#send_request(headers) ⇒ void
This method returns an undefined value.
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/grpc_kit/session/client_session.rb', line 35 def send_request(headers) if @draining raise ConnectionClosing, "You can't send new request. becuase this connection will shuting down" end stream = GrpcKit::Session::Stream.new(stream_id: 0) # set later stream_id = submit_request(headers, stream.pending_send_data).to_i stream.stream_id = stream_id @streams[stream_id] = stream @no_write_data = false stream end |
#start(stream_id) ⇒ void
This method returns an undefined value.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/grpc_kit/session/client_session.rb', line 50 def start(stream_id) stream = @streams[stream_id] return unless stream # stream might have already close loop do if (!want_read? && !want_write?) || stream.close? break end run_once end rescue Errno::ECONNRESET, IOError => e GrpcKit.logger.debug(e.) shutdown end |