Class: Ccrpc::RpcConnection
- Inherits:
-
Object
- Object
- Ccrpc::RpcConnection
- Defined in:
- lib/ccrpc/rpc_connection.rb
Defined Under Namespace
Classes: Call, CallAlreadyReturned, CallbackReceiver, ConnectionDetached, DoubleResultError, InvalidResponse, NoCallbackDefined, ReceivedCallData, ReceiverAlreadyDefined
Instance Attribute Summary collapse
-
#read_io ⇒ Object
The kind of
IOobject used to receive calls and answers. -
#write_io ⇒ Object
The kind of
IOobject used to send calls and answers.
Instance Method Summary collapse
-
#call(func = nil, params = {}) {|block| ... } ⇒ Hash, ...
Do a RPC call and/or wait for a RPC call from the other side.
-
#detach ⇒ Object
Disable reception of data from the #read_io object.
-
#initialize(read_io, write_io, lazy_answers: false, protocol: :text) ⇒ RpcConnection
constructor
Create a RPC connection.
Constructor Details
#initialize(read_io, write_io, lazy_answers: false, protocol: :text) ⇒ RpcConnection
Create a RPC connection
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/ccrpc/rpc_connection.rb', line 100 def initialize(read_io, write_io, lazy_answers: false, protocol: :text) super() @read_io = read_io @write_io = write_io @read_binary = false @write_binary = case protocol when :binary true when :text, :only_text # only_text is to simulate ccrpc-0.4.0 peer false when :prefer_binary nil else raise ArgumentError, "invalid protocol: #{protocol.inspect}" end if lazy_answers require 'ccrpc/lazy' alias maybe_lazy do_lazy else alias maybe_lazy dont_lazy end if @write_io.respond_to?(:setsockopt) @write_io.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, true) end @id_mutex = Mutex.new @read_mutex = Mutex.new @write_mutex = Mutex.new @answers = {} @receivers = {} @answers_mutex = Mutex.new @proto_ack_mutex = Mutex.new @new_answer = ConditionVariable.new @read_enum = Enumerator.new do |y| begin while @read_enum if @read_binary t = @read_io.read(1)&.getbyte(0) # p @read_io=>t case t when 1 keysize, valsize = @read_io.read(8).unpack("NN") key = @read_io.read(keysize).force_encoding(Encoding::UTF_8) value = @read_io.read(valsize).force_encoding(Encoding::UTF_8) y << [key, value] when 2 id, funcsize = @read_io.read(8).unpack("NN") func = @read_io.read(funcsize) y << ReceivedCallData.new(func.force_encoding(Encoding::UTF_8), id) when 3 id, recv_id, funcsize = @read_io.read(12).unpack("NNN") func = @read_io.read(funcsize) y << ReceivedCallData.new(func.force_encoding(Encoding::UTF_8), id, recv_id) when 4 id = @read_io.read(4).unpack1("N") y << id when 79 # "O" l = @read_io.read(6) unless l == "\tK\n\a1\n" raise InvalidResponse, "invalid binary response #{l.inspect}" end y << ["O", "K"] y << 1 when NilClass # connection closed break else raise InvalidResponse, "invalid binary response #{t.inspect}" end else l = @read_io.gets&.force_encoding(Encoding::BINARY) # p @read_io=>l case when l=="\r\0\a1\n" && protocol != :only_text @read_binary = true when l=="\r\1\a1\n" && protocol != :only_text @read_binary = true send_answer({O: :K}, 1) when l=~/\A([^\t\a\n]+)\t(.*?)\r?\n\z/mn # received key/value pair used for either callback parameters or return values y << [Escape.unescape($1).force_encoding(Encoding::UTF_8), Escape.unescape($2.force_encoding(Encoding::UTF_8))] when l=~/\A([^\t\a\n]+)(?:\a(\d+))?(?:\a(\d+))?\r?\n\z/mn # received callback y << ReceivedCallData.new(Escape.unescape($1.force_encoding(Encoding::UTF_8)), $2&.to_i, $3&.to_i) when l=~/\A\a(\d+)\r?\n\z/mn # received return event y << $1.to_i when l.nil? # connection closed break else raise InvalidResponse, "invalid text response #{l.inspect}" end end end rescue => err y << err end end if @write_binary == true # immediate binary mode # Use ID 1 for proto change request to have a fixed string over the wire register_call("\r", 1) @write_io.write "\r\0\a1\n" @write_io.flush end # A random number as start call ID is not technically required, but makes transferred data more readable. @id = rand(1000) + 1 end |
Instance Attribute Details
#read_io ⇒ Object
The kind of IO object used to receive calls and answers. Set by #initialize.
82 83 84 |
# File 'lib/ccrpc/rpc_connection.rb', line 82 def read_io @read_io end |
#write_io ⇒ Object
The kind of IO object used to send calls and answers. Set by #initialize.
84 85 86 |
# File 'lib/ccrpc/rpc_connection.rb', line 84 def write_io @write_io end |
Instance Method Details
#call(func = nil, params = {}) {|block| ... } ⇒ Hash, ...
Do a RPC call and/or wait for a RPC call from the other side.
#call must be called with either a function name (and optional parameters) or with a block or with both.
-
If called with a function name, the block on the other end of the RPC connection is called with that function name.
-
If called with a block only, than it receives these kind of calls, which are called anonymous callbacks.
-
If called with a function name and a block, then the RPC function on the other side is called and it is possible to call back to this dedicated block by invoking Ccrpc::RpcConnection::Call#call_back .
265 266 267 |
# File 'lib/ccrpc/rpc_connection.rb', line 265 def call(func=nil, params={}, &block) call_intern(func, params, &block) end |
#detach ⇒ Object
235 236 237 |
# File 'lib/ccrpc/rpc_connection.rb', line 235 def detach @read_enum = nil end |