Class: GrpcKit::Session::ServerSession

Inherits:
DS9::Server
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/grpc_kit/session/server_session.rb

Instance Method Summary collapse

Constructor Details

#initialize(io, dispatcher) ⇒ ServerSession

Returns a new instance of ServerSession.

Parameters:



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/grpc_kit/session/server_session.rb', line 18

def initialize(io, dispatcher)
  opt = DS9::Option.new.tap do |o|
    # https://github.com/nghttp2/nghttp2/issues/810
    # grpc_kit doesn't need to retain closed stream.
    # This would derease the memory usage.
    o.set_no_closed_streams
  end
  super(option: opt) # initialize DS9::Session

  @io = io
  @streams = {}
  @stop = false
  @inflights = []
  @drain_controller = GrpcKit::Session::DrainController.new
  @control_queue = GrpcKit::Session::ControlQueue.new(waker: @io.method(:wake!))
  @dispatcher = dispatcher
end

Instance Method Details

#drainvoid

This method returns an undefined value.



85
86
87
# File 'lib/grpc_kit/session/server_session.rb', line 85

def drain
  @drain_controller.start_draining
end

#run_oncebool

Return session can continue

Returns:

  • (bool)

    return session can continue



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/grpc_kit/session/server_session.rb', line 56

def run_once
  if @stop || !(want_read? || want_write?)
    # it could be called twice
    @streams.each_value(&:close)
    return false
  end

  if @drain_controller.start_draining?
    @drain_controller.next(self)
  end

  rs, ws = @io.select(timeout: 5, write: want_write?)

  if !rs.empty? && want_read?
    do_read
  end

  if !ws.empty? && want_write?
    send
  end

  true
rescue Errno::ECONNRESET, IOError => e
  GrpcKit.logger.error(e.message)
  shutdown
  false
end

#shutdownvoid

This method returns an undefined value.



90
91
92
93
94
95
# File 'lib/grpc_kit/session/server_session.rb', line 90

def shutdown
  stop
  @io.close
rescue StandardError => e
  GrpcKit.logger.error(e)
end

#startvoid

This method returns an undefined value.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/grpc_kit/session/server_session.rb', line 37

def start
  loop do
    invoke

    if @streams.empty?
      unless @io.wait_readable
        shutdown
        break
      end
    end

    continue = run_once
    break unless continue
  end
ensure
  GrpcKit.logger.debug('Finish server session')
end