Class: GrpcKit::Session::ClientSession

Inherits:
DS9::Client
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/grpc_kit/session/client_session.rb

Defined Under Namespace

Classes: ConnectionClosing

Constant Summary collapse

MAX_STREAM_ID =
2**31 - 1

Instance Method Summary collapse

Constructor Details

#initialize(io, **opts) ⇒ ClientSession

Returns a new instance of ClientSession.

Parameters:



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/grpc_kit/session/client_session.rb', line 21

def initialize(io, **opts)
  super() # initialize DS9::Session

  @io = io
  @streams = {}
  @opts = opts
  @draining = false
  @stop = false
  @no_write_data = false
  @mutex = Mutex.new
end

Instance Method Details

#run_oncevoid

This method returns an undefined value.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/grpc_kit/session/client_session.rb', line 67

def run_once
  @mutex.synchronize do
    return if @stop

    if @draining && @drain_time < Time.now
      raise 'trasport is closing'
    end

    if @no_write_data && !@streams.empty?
      @io.wait_readable

      if want_read?
        do_read
      end
    else
      rs, ws = @io.select
      if !rs.empty? && want_read?
        do_read
      end

      if !ws.empty? && want_write?
        send
      end
    end
  end
end

#send_request(headers) ⇒ void

This method returns an undefined value.

Parameters:

  • headers (Hash<String,String>)


35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/grpc_kit/session/client_session.rb', line 35

def send_request(headers)
  if @draining
    raise ConnectionClosing, "You can't send new request. becuase this connection will shuting down"
  end

  stream = GrpcKit::Session::Stream.new(stream_id: 0) # set later
  stream_id = submit_request(headers, stream.pending_send_data).to_i
  stream.stream_id = stream_id
  @streams[stream_id] = stream
  @no_write_data = false
  stream
end

#start(stream_id) ⇒ void

This method returns an undefined value.

Parameters:

  • stream_id (Integer)


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/grpc_kit/session/client_session.rb', line 50

def start(stream_id)
  stream = @streams[stream_id]
  return unless stream # stream might have already close

  loop do
    if (!want_read? && !want_write?) || stream.close?
      break
    end

    run_once
  end
rescue Errno::ECONNRESET, IOError => e
  GrpcKit.logger.debug(e.message)
  shutdown
end