Class: GrpcKit::Session::RecvBuffer

Inherits:
Object
  • Object
show all
Defined in:
lib/grpc_kit/session/recv_buffer.rb

Defined Under Namespace

Classes: Closed

Instance Method Summary collapse

Constructor Details

#initializeRecvBuffer

Returns a new instance of RecvBuffer.



8
9
10
11
12
# File 'lib/grpc_kit/session/recv_buffer.rb', line 8

def initialize
  @buffer = +''.b
  @end = false
  @queue = Queue.new
end

Instance Method Details

#closevoid

This method returns an undefined value.



33
34
35
# File 'lib/grpc_kit/session/recv_buffer.rb', line 33

def close
  @queue.close
end

#closed?Boolean

Returns:

  • (Boolean)


28
29
30
# File 'lib/grpc_kit/session/recv_buffer.rb', line 28

def closed?
  @queue.closed?
end

#empty?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/grpc_kit/session/recv_buffer.rb', line 23

def empty?
  @queue.empty?
end

#end_readvoid

This method returns an undefined value.



81
82
83
# File 'lib/grpc_kit/session/recv_buffer.rb', line 81

def end_read
  @end = true
end

#end_read?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/grpc_kit/session/recv_buffer.rb', line 76

def end_read?
  @end
end

#read(size = nil, last: false, blocking:) ⇒ String, ...

This method is not thread safe (as RecvBuffer is designed to be a multi-producer/single-consumer)

Parameters:

  • size (Integer, nil) (defaults to: nil)
  • last (Boolean) (defaults to: false)
  • blocking (Boolean)

Returns:

  • (String, Symbol, nil)


42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/grpc_kit/session/recv_buffer.rb', line 42

def read(size = nil, last: false, blocking:)
  if @buffer.empty?
    return nil if empty? && closed?
    return :wait_readable if empty? && !blocking

    # Consume existing data as much as possible to continue (important on clients where single-threaded)
    loop do
      begin
        data = @queue.shift(!blocking)
        @buffer << data if data
      rescue ThreadError
        break
      end

      break if empty?
    end 
  end

  buf = if size.nil? || @buffer.bytesize < size
    rbuf = @buffer
    @buffer = ''.b
    rbuf
  else
    @buffer.freeze
    rbuf = @buffer.byteslice(0, size)
    @buffer = @buffer.byteslice(size, @buffer.bytesize)
    rbuf
  end

  end_read if last
  buf
end

#write(data) ⇒ void

This method returns an undefined value.

Parameters:

  • data (String)


16
17
18
19
20
# File 'lib/grpc_kit/session/recv_buffer.rb', line 16

def write(data)
  @queue << data
rescue ClosedQueueError
  raise Closed, "[BUG] write to closed queue"
end