Class: Ccrpc::RpcConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/ccrpc/rpc_connection.rb

Defined Under Namespace

Classes: Call, CallAlreadyReturned, CallbackReceiver, ConnectionDetached, DoubleResultError, InvalidResponse, NoCallbackDefined, ReceivedCallData, ReceiverAlreadyDefined

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(read_io, write_io, lazy_answers: false, protocol: :text) ⇒ RpcConnection

Create a RPC connection

Parameters:

  • read_io (IO)

    readable IO object for reception of data

  • write_io (IO)

    writable IO object for transmission of data

  • lazy_answers (Boolean) (defaults to: false)

    Enable or disable lazy results. If enabled the return value of #call is always a Ccrpc::Promise object. It behaves like an ordinary nil or Hash object, but the actual IO blocking operation is delayed to the first method call on the Promise object. See #call for more description.

  • protocol (Symbol) (defaults to: :text)

    Select the protocol which is used to send calls.

    • The :text protocol is the classic default.

    • The :binary protocol is faster, but not so readable for human.

    • The :prefer_binary is the same as :binary, but with an initial round-trip to check that the other end is binary-capable (means ccrpc >= 0.5).

    The protocol used to receive calls is selected by the protocol option on the other end. A connection could use different protocols for both directions, although this has no advantage.



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/ccrpc/rpc_connection.rb', line 100

def initialize(read_io, write_io, lazy_answers: false, protocol: :text)
  super()

  @read_io = read_io
  @write_io = write_io
  @read_binary = false
  @write_binary = case protocol
    when :binary
      true
    when :text, :only_text # only_text is to simulate ccrpc-0.4.0 peer
      false
    when :prefer_binary
      nil
    else
      raise ArgumentError, "invalid protocol: #{protocol.inspect}"
  end
  if lazy_answers
    require 'ccrpc/lazy'
    alias maybe_lazy do_lazy
  else
    alias maybe_lazy dont_lazy
  end

  if @write_io.respond_to?(:setsockopt)
    @write_io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true)
  end

  @id_mutex = Mutex.new
  @read_mutex = Mutex.new
  @write_mutex = Mutex.new
  @answers = {}
  @receivers = {}
  @answers_mutex = Mutex.new
  @proto_ack_mutex = Mutex.new
  @new_answer = ConditionVariable.new

  @read_enum = Enumerator.new do |y|
    begin
      while @read_enum
        if @read_binary
          t = @read_io.read(1)&.getbyte(0)
          # p @read_io=>t
          case t
            when 1
              keysize, valsize = @read_io.read(8).unpack("NN")
              key = @read_io.read(keysize).force_encoding(Encoding::UTF_8)
              value = @read_io.read(valsize).force_encoding(Encoding::UTF_8)
              y << [key, value]
            when 2
              id, funcsize = @read_io.read(8).unpack("NN")
              func = @read_io.read(funcsize)
              y << ReceivedCallData.new(func.force_encoding(Encoding::UTF_8), id)
            when 3
              id, recv_id, funcsize = @read_io.read(12).unpack("NNN")
              func = @read_io.read(funcsize)
              y << ReceivedCallData.new(func.force_encoding(Encoding::UTF_8), id, recv_id)
            when 4
              id = @read_io.read(4).unpack1("N")
              y << id
            when 79 # "O"
              l = @read_io.read(6)
              unless l == "\tK\n\a1\n"
                raise InvalidResponse, "invalid binary response #{l.inspect}"
              end
              y << ["O", "K"]
              y << 1

            when NilClass
              # connection closed
              break

            else
              raise InvalidResponse, "invalid binary response #{t.inspect}"
          end

        else

          l = @read_io.gets&.force_encoding(Encoding::BINARY)
          # p @read_io=>l
          case
            when l=="\r\0\a1\n" && protocol != :only_text
              @read_binary = true
            when l=="\r\1\a1\n" && protocol != :only_text
              @read_binary = true
              send_answer({O: :K}, 1)

            when l=~/\A([^\t\a\n]+)\t(.*?)\r?\n\z/mn
              # received key/value pair used for either callback parameters or return values
              y << [Escape.unescape($1).force_encoding(Encoding::UTF_8), Escape.unescape($2.force_encoding(Encoding::UTF_8))]

            when l=~/\A([^\t\a\n]+)(?:\a(\d+))?(?:\a(\d+))?\r?\n\z/mn
              # received callback
              y << ReceivedCallData.new(Escape.unescape($1.force_encoding(Encoding::UTF_8)), $2&.to_i, $3&.to_i)

            when l=~/\A\a(\d+)\r?\n\z/mn
              # received return event
              y << $1.to_i

            when l.nil?
              # connection closed
              break

            else
              raise InvalidResponse, "invalid text response #{l.inspect}"
          end
        end
      end
    rescue => err
      y << err
    end
  end

  if @write_binary == true # immediate binary mode
    # Use ID 1 for proto change request to have a fixed string over the wire
    register_call("\r", 1)
    @write_io.write "\r\0\a1\n"
    @write_io.flush
  end

  # A random number as start call ID is not technically required, but makes transferred data more readable.
  @id = rand(1000) + 1
end

Instance Attribute Details

#read_ioObject

The kind of IO object used to receive calls and answers. Set by #initialize.



82
83
84
# File 'lib/ccrpc/rpc_connection.rb', line 82

def read_io
  @read_io
end

#write_ioObject

The kind of IO object used to send calls and answers. Set by #initialize.



84
85
86
# File 'lib/ccrpc/rpc_connection.rb', line 84

def write_io
  @write_io
end

Instance Method Details

#call(func = nil, params = {}) {|block| ... } ⇒ Hash, ...

Do a RPC call and/or wait for a RPC call from the other side.

#call must be called with either a function name (and optional parameters) or with a block or with both.

  • If called with a function name, the block on the other end of the RPC connection is called with that function name.

  • If called with a block only, than it receives these kind of calls, which are called anonymous callbacks.

  • If called with a function name and a block, then the RPC function on the other side is called and it is possible to call back to this dedicated block by invoking Ccrpc::RpcConnection::Call#call_back .

Parameters:

  • func (String, Symbol) (defaults to: nil)

    The RPC function to be called on the other side. The other side must wait for calls through #call without arguments but with a block.

  • params (Hash{Symbol, String => Symbol, String}) (defaults to: {})

    Optional parameters passed with the RPC call. They can be retrieved through Ccrpc::RpcConnection::Call#params on the receiving side.

Yield Parameters:

  • block (Ccrpc::Call)

    The context of the received call.

Yield Returns:

  • (Hash{String, Symbol => String, Symbol})

    The answer parameters to be sent back to the caller.

  • (Array<Hash>)

    Two element array with the answer parameters as the first element and true as the second. By this answer type the answer is sent to other side but the reception of further calls or callbacks is stopped subsequently and the local corresponding #call method returns with nil.

Returns:

  • (Hash)

    Received answer parameters.

  • (Promise)

    Received answer parameters enveloped by a Promise. This type of answers can be enabled by RpcConnection#new(lazy_answers: true) The Promise object is returned as soon as the RPC call is sent and a callback receiver is registered, but before waiting for the corresponding answer. This way several calls can be send in parallel without using threads. As soon as any method is called on the Promise object, this method is blocked until the RPC answer was received. When the RPC answer has been received, the Promise object then behaves like an ordinary Hash object or nil in case of connection end. It is recommended to use Promise#itself to trigger explicit waiting for call answers or callbacks (although any other method triggers waiting as well).

  • (NilClass)

    Waiting for further answers was stopped gracefully by either returning [hash, true] from the block or because the connection was closed.



265
266
267
# File 'lib/ccrpc/rpc_connection.rb', line 265

def call(func=nil, params={}, &block)
  call_intern(func, params, &block)
end

#detachObject

Disable reception of data from the #read_io object.

This function doesn’t close the IO objects. A waiting reception is not aborted by this call. It can be aborted by calling IO#close on the underlying #read_io and #write_io objects.



235
236
237
# File 'lib/ccrpc/rpc_connection.rb', line 235

def detach
  @read_enum = nil
end