Class: Avro::IPC::Requestor
- Inherits:
-
Object
- Object
- Avro::IPC::Requestor
- Defined in:
- lib/avro/ipc.rb
Instance Attribute Summary collapse
-
#local_protocol ⇒ Object
readonly
Returns the value of attribute local_protocol.
-
#remote_hash ⇒ Object
Returns the value of attribute remote_hash.
-
#remote_protocol ⇒ Object
Returns the value of attribute remote_protocol.
-
#send_protocol ⇒ Object
Returns the value of attribute send_protocol.
-
#transport ⇒ Object
readonly
Returns the value of attribute transport.
Instance Method Summary collapse
-
#initialize(local_protocol, transport) ⇒ Requestor
constructor
A new instance of Requestor.
- #read_call_response(message_name, decoder) ⇒ Object
- #read_error(writers_schema, readers_schema, decoder) ⇒ Object
- #read_handshake_response(decoder) ⇒ Object
- #read_response(writers_schema, readers_schema, decoder) ⇒ Object
- #request(message_name, request_datum) ⇒ Object
- #write_call_request(message_name, request_datum, encoder) ⇒ Object
- #write_handshake_request(encoder) ⇒ Object
- #write_request(request_schema, request_datum, encoder) ⇒ Object
Constructor Details
#initialize(local_protocol, transport) ⇒ Requestor
Returns a new instance of Requestor.
82 83 84 85 86 87 88 |
# File 'lib/avro/ipc.rb', line 82 def initialize(local_protocol, transport) @local_protocol = local_protocol @transport = transport @remote_protocol = nil @remote_hash = nil @send_protocol = nil end |
Instance Attribute Details
#local_protocol ⇒ Object (readonly)
Returns the value of attribute local_protocol.
79 80 81 |
# File 'lib/avro/ipc.rb', line 79 def local_protocol @local_protocol end |
#remote_hash ⇒ Object
Returns the value of attribute remote_hash.
80 81 82 |
# File 'lib/avro/ipc.rb', line 80 def remote_hash @remote_hash end |
#remote_protocol ⇒ Object
Returns the value of attribute remote_protocol.
80 81 82 |
# File 'lib/avro/ipc.rb', line 80 def remote_protocol @remote_protocol end |
#send_protocol ⇒ Object
Returns the value of attribute send_protocol.
80 81 82 |
# File 'lib/avro/ipc.rb', line 80 def send_protocol @send_protocol end |
#transport ⇒ Object (readonly)
Returns the value of attribute transport.
79 80 81 |
# File 'lib/avro/ipc.rb', line 79 def transport @transport end |
Instance Method Details
#read_call_response(message_name, decoder) ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/avro/ipc.rb', line 190 def read_call_response(, decoder) # The format of a call response is: # * response metadata, a map with values of type bytes # * a one-byte error flag boolean, followed by either: # * if the error flag is false, # the message response, serialized per the message's response schema. # * if the error flag is true, # the error, serialized per the message's error union schema. = META_READER.read(decoder) # remote response schema = remote_protocol.[] raise AvroError.new("Unknown remote message: #{}") unless # local response schema = local_protocol.[] unless raise AvroError.new("Unknown local message: #{}") end # error flag if !decoder.read_boolean writers_schema = .response readers_schema = .response read_response(writers_schema, readers_schema, decoder) else writers_schema = .errors || SYSTEM_ERROR_SCHEMA readers_schema = .errors || SYSTEM_ERROR_SCHEMA raise read_error(writers_schema, readers_schema, decoder) end end |
#read_error(writers_schema, readers_schema, decoder) ⇒ Object
227 228 229 230 |
# File 'lib/avro/ipc.rb', line 227 def read_error(writers_schema, readers_schema, decoder) datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) AvroRemoteError.new(datum_reader.read(decoder)) end |
#read_handshake_response(decoder) ⇒ Object
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/avro/ipc.rb', line 164 def read_handshake_response(decoder) handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder) we_have_matching_schema = false case handshake_response['match'] when 'BOTH' self.send_protocol = false we_have_matching_schema = true when 'CLIENT' raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) self.remote_hash = handshake_response['serverHash'] self.send_protocol = false we_have_matching_schema = true when 'NONE' raise AvroError.new('Handshake failure. match == NONE') if send_protocol self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) self.remote_hash = handshake_response['serverHash'] self.send_protocol = true else raise AvroError.new("Unexpected match: #{match}") end return we_have_matching_schema end |
#read_response(writers_schema, readers_schema, decoder) ⇒ Object
222 223 224 225 |
# File 'lib/avro/ipc.rb', line 222 def read_response(writers_schema, readers_schema, decoder) datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) datum_reader.read(decoder) end |
#request(message_name, request_datum) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/avro/ipc.rb', line 100 def request(, request_datum) # Writes a request message and reads a response or error message. # build handshake and call request buffer_writer = StringIO.new('', 'w+') buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) write_handshake_request(buffer_encoder) write_call_request(, request_datum, buffer_encoder) # send the handshake and call request; block until call response call_request = buffer_writer.string call_response = transport.transceive(call_request) # process the handshake and call response buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_response)) if read_handshake_response(buffer_decoder) read_call_response(, buffer_decoder) else request(, request_datum) end end |
#write_call_request(message_name, request_datum, encoder) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/avro/ipc.rb', line 139 def write_call_request(, request_datum, encoder) # The format of a call request is: # * request metadata, a map with values of type bytes # * the message name, an Avro string, followed by # * the message parameters. Parameters are serialized according to # the message's request declaration. # TODO request metadata (not yet implemented) = {} META_WRITER.write(, encoder) = local_protocol.[] unless raise AvroError, "Unknown message: #{}" end encoder.write_string(.name) write_request(.request, request_datum, encoder) end |
#write_handshake_request(encoder) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/avro/ipc.rb', line 121 def write_handshake_request(encoder) local_hash = local_protocol.md5 remote_name = transport.remote_name remote_hash = REMOTE_HASHES[remote_name] unless remote_hash remote_hash = local_hash self.remote_protocol = local_protocol end request_datum = { 'clientHash' => local_hash, 'serverHash' => remote_hash } if send_protocol request_datum['clientProtocol'] = local_protocol.to_s end HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder) end |
#write_request(request_schema, request_datum, encoder) ⇒ Object
159 160 161 162 |
# File 'lib/avro/ipc.rb', line 159 def write_request(request_schema, request_datum, encoder) datum_writer = Avro::IO::DatumWriter.new(request_schema) datum_writer.write(request_datum, encoder) end |