Class: GrpcKit::Calls::Client::BidiStreamer

Inherits:
GrpcKit::Call show all
Includes:
Enumerable
Defined in:
lib/grpc_kit/calls/client_bidi_streamer.rb

Instance Attribute Summary

Attributes inherited from GrpcKit::Call

#metadata, #method, #method_name, #service_name

Instance Method Summary collapse

Methods inherited from GrpcKit::Call

#deadline

Constructor Details

#initializeBidiStreamer

Returns a new instance of BidiStreamer.



13
14
15
16
17
18
19
20
# File 'lib/grpc_kit/calls/client_bidi_streamer.rb', line 13

def initialize(**)
  super
  @recv_mutex = Mutex.new

  @send = false
  @send_cv = Thread::ConditionVariable.new
  @send_mutex = Mutex.new
end

Instance Method Details

#close_and_sendObject



50
51
52
53
54
# File 'lib/grpc_kit/calls/client_bidi_streamer.rb', line 50

def close_and_send
  @send_mutex.synchronize do
    @stream.close_and_send
  end
end

#each {|response| ... } ⇒ Object

Yield Parameters:

  • response (Object)

    each response object of bidi streaming RPC



57
58
59
60
61
# File 'lib/grpc_kit/calls/client_bidi_streamer.rb', line 57

def each
  @recv_mutex.synchronize do
    loop { yield(recv) }
  end
end

#recvObject

Receive a message from peer. This method is not thread safe, never call from multiple threads at once.

Returns:

  • (Object)

    response object

Raises:

  • (StopIteration)


39
40
41
42
43
44
45
46
47
48
# File 'lib/grpc_kit/calls/client_bidi_streamer.rb', line 39

def recv
  @send_mutex.synchronize { @send_cv.wait(@send_mutex) until @send } unless @send

  msg = @stream.recv_msg(blocking: true)
  return msg if msg
  raise StopIteration
rescue GrpcKit::Errors::BadStatus => e
  @reason = e
  raise e
end

#send_msg(data) ⇒ void

This method returns an undefined value.

Parameters:

  • data (Object)

    request message



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/grpc_kit/calls/client_bidi_streamer.rb', line 24

def send_msg(data)
  if @reason
    raise "Upstream returns an error status: #{@reason}"
  end

  @send_mutex.synchronize do
    @stream.send_msg(data, metadata: )
    @send = true
    @send_cv.broadcast
  end
end