Class: Ccrpc::RpcConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/ccrpc/rpc_connection.rb

Defined Under Namespace

Classes: Call, CallAlreadyReturned, CallbackReceiver, ConnectionDetached, DoubleResultError, InvalidResponse, NoCallbackDefined, ReceiverAlreadyDefined

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(read_io, write_io, lazy_answers: false) ⇒ RpcConnection

Create a RPC connection

Parameters:

  • read_io (IO)

    readable IO object for reception of data

  • write_io (IO)

    writable IO object for transmission of data

  • lazy_answers (Boolean) (defaults to: false)

    Enable or disable lazy results. If enabled the return value of #call is always a Ccrpc::Promise object. It behaves like an ordinary nil or Hash object, but the actual IO blocking operation is delayed to the first method call on the Promise object. See #call for more description.


91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/ccrpc/rpc_connection.rb', line 91

def initialize(read_io, write_io, lazy_answers: false)
  super()

  @read_io = read_io
  @write_io = write_io
  if lazy_answers
    require 'ccrpc/lazy'
    alias maybe_lazy do_lazy
  else
    alias maybe_lazy dont_lazy
  end

  if @write_io.respond_to?(:setsockopt)
    @write_io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
  end

  # A random number as start call ID is not technically required, but makes transferred data more readable.
  @id = rand(1000)
  @id_mutex = Mutex.new
  @read_mutex = Mutex.new
  @write_mutex = Mutex.new
  @answers = {}
  @receivers = {}
  @answers_mutex = Mutex.new
  @new_answer = ConditionVariable.new

  @read_enum = Enumerator.new do |y|
    begin
      while @read_enum
        l = @read_io.gets&.force_encoding(Encoding::BINARY)
        break if l.nil?
        y << l
      end
    rescue => err
      y << err
    end
  end
end

Instance Attribute Details

#read_ioObject

Returns the value of attribute read_io.


80
81
82
# File 'lib/ccrpc/rpc_connection.rb', line 80

def read_io
  @read_io
end

#write_ioObject

Returns the value of attribute write_io.


81
82
83
# File 'lib/ccrpc/rpc_connection.rb', line 81

def write_io
  @write_io
end

Instance Method Details

#call(func = nil, params = {}) {|block| ... } ⇒ Hash, ...

Do a RPC call and/or wait for a RPC call from the other side.

#call must be called with either a function name (and optional parameters) or with a block or with both. If #call is called with a function name, the block on the other side of the RPC connection is called with that function name. If #call is called with a block only, than it receives these kind of calls, which are called anonymous callbacks. If #call is called with a function name and a block, then the RPC function on the other side is called and it is possible to call back to this dedicated block by invoking Ccrpc::RpcConnection::Call#call_back .

Parameters:

  • func (String, Symbol) (defaults to: nil)

    The RPC function to be called on the other side. The other side must wait for calls through #call without arguments but with a block.

  • params (Hash{Symbol, String => Symbol, String}) (defaults to: {})

    Optional parameters passed with the RPC call. They can be retrieved through Ccrpc::RpcConnection::Call#params on the receiving side.

Yield Parameters:

  • block (Ccrpc::Call)

    The context of the received call.

Yield Returns:

  • (Hash{String, Symbol => String, Symbol})

    The answer parameters to be sent back to the caller.

  • (Array<Hash>)

    Two element array with the answer parameters as the first element and true as the second. By this answer type the answer is sent to other side but the reception of further calls or callbacks is stopped subsequently and the local corresponding #call method returns with nil.

Returns:

  • (Hash)

    Received answer parameters.

  • (Promise)

    Received answer parameters enveloped by a Promise. This type of answers can be enabled by RpcConnection#new(lazy_answers: true) The Promise object is returned as soon as the RPC call is sent and a callback receiver is registered, but before waiting for the corresponding answer. This way several calls can be send in parallel without using threads. As soon as a method is called on the Promise object, this method is blocked until the RPC answer was received. The Promise object then behaves like a Hash or nil object. It is recommended to use Promise#itself to trigger waiting for call answers or callbacks (although any other method triggers waiting as well).

  • (NilClass)

    Waiting for further answers was stopped gracefully by either returning [hash, true] from the block or because the connection was closed.


172
173
174
# File 'lib/ccrpc/rpc_connection.rb', line 172

def call(func=nil, params={}, &block)
  call_intern(func, params, &block)
end

#detachObject

Disable reception of data from the read_io object.

This function doesn’t close the IO objects. A waiting reception is not aborted by this call. It can be aborted by calling IO#close on the underlying read_io and write_io objects.


142
143
144
# File 'lib/ccrpc/rpc_connection.rb', line 142

def detach
  @read_enum = nil
end