Class: Avro::IPC::Responder
- Inherits:
-
Object
- Object
- Avro::IPC::Responder
- Defined in:
- lib/avro/ipc.rb
Overview
Base class for the server side of a protocol interaction.
Instance Attribute Summary collapse
-
#local_hash ⇒ Object
readonly
Returns the value of attribute local_hash.
-
#local_protocol ⇒ Object
readonly
Returns the value of attribute local_protocol.
-
#protocol_cache ⇒ Object
readonly
Returns the value of attribute protocol_cache.
Instance Method Summary collapse
- #call(_local_message, _request) ⇒ Object
-
#initialize(local_protocol) ⇒ Responder
constructor
A new instance of Responder.
- #process_handshake(decoder, encoder, connection = nil) ⇒ Object
- #read_request(writers_schema, readers_schema, decoder) ⇒ Object
-
#respond(call_request, transport = nil) ⇒ Object
Called by a server to deserialize a request, compute and serialize a response or error.
- #write_error(writers_schema, error_exception, encoder) ⇒ Object
- #write_response(writers_schema, response_datum, encoder) ⇒ Object
Constructor Details
#initialize(local_protocol) ⇒ Responder
Returns a new instance of Responder.
240 241 242 243 244 245 |
# File 'lib/avro/ipc.rb', line 240 def initialize(local_protocol) @local_protocol = local_protocol @local_hash = self.local_protocol.md5 @protocol_cache = {} protocol_cache[local_hash] = local_protocol end |
Instance Attribute Details
#local_hash ⇒ Object (readonly)
Returns the value of attribute local_hash.
239 240 241 |
# File 'lib/avro/ipc.rb', line 239 def local_hash @local_hash end |
#local_protocol ⇒ Object (readonly)
Returns the value of attribute local_protocol.
239 240 241 |
# File 'lib/avro/ipc.rb', line 239 def local_protocol @local_protocol end |
#protocol_cache ⇒ Object (readonly)
Returns the value of attribute protocol_cache.
239 240 241 |
# File 'lib/avro/ipc.rb', line 239 def protocol_cache @protocol_cache end |
Instance Method Details
#call(_local_message, _request) ⇒ Object
357 358 359 360 |
# File 'lib/avro/ipc.rb', line 357 def call(, _request) # Actual work done by server: cf. handler in thrift. raise NotImplementedError end |
#process_handshake(decoder, encoder, connection = nil) ⇒ Object
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/avro/ipc.rb', line 310 def process_handshake(decoder, encoder, connection=nil) if connection && connection.is_connected? return connection.protocol end handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder) handshake_response = {} # determine the remote protocol client_hash = handshake_request['clientHash'] client_protocol = handshake_request['clientProtocol'] remote_protocol = protocol_cache[client_hash] if !remote_protocol && client_protocol remote_protocol = Avro::Protocol.parse(client_protocol) protocol_cache[client_hash] = remote_protocol end # evaluate remote's guess of the local protocol server_hash = handshake_request['serverHash'] if local_hash == server_hash if !remote_protocol handshake_response['match'] = 'NONE' else handshake_response['match'] = 'BOTH' end else if !remote_protocol handshake_response['match'] = 'NONE' else handshake_response['match'] = 'CLIENT' end end if handshake_response['match'] != 'BOTH' handshake_response['serverProtocol'] = local_protocol.to_s handshake_response['serverHash'] = local_hash end HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder) if connection && handshake_response['match'] != 'NONE' connection.protocol = remote_protocol end remote_protocol end |
#read_request(writers_schema, readers_schema, decoder) ⇒ Object
362 363 364 365 |
# File 'lib/avro/ipc.rb', line 362 def read_request(writers_schema, readers_schema, decoder) datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) datum_reader.read(decoder) end |
#respond(call_request, transport = nil) ⇒ Object
Called by a server to deserialize a request, compute and serialize a response or error. Compare to ‘handle()’ in Thrift.
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 |
# File 'lib/avro/ipc.rb', line 249 def respond(call_request, transport=nil) buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request)) buffer_writer = StringIO.new(String.new('', encoding: 'BINARY')) buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) error = nil = {} begin remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport) # handshake failure unless remote_protocol return buffer_writer.string end # read request using remote protocol = META_READER.read(buffer_decoder) = buffer_decoder.read_string # get remote and local request schemas so we can do # schema resolution (one fine day) = remote_protocol.[] unless raise AvroError.new("Unknown remote message: #{}") end = local_protocol.[] unless raise AvroError.new("Unknown local message: #{}") end writers_schema = .request readers_schema = .request request = read_request(writers_schema, readers_schema, buffer_decoder) # perform server logic begin response = call(, request) rescue AvroRemoteError => e error = e rescue Exception => e # rubocop:disable Lint/RescueException error = AvroRemoteError.new(e.to_s) end # write response using local protocol META_WRITER.write(, buffer_encoder) buffer_encoder.write_boolean(!!error) if error.nil? writers_schema = .response write_response(writers_schema, response, buffer_encoder) else writers_schema = .errors || SYSTEM_ERROR_SCHEMA write_error(writers_schema, error, buffer_encoder) end rescue Avro::AvroError => e error = AvroRemoteException.new(e.to_s) # TODO does the stuff written here ever get used? buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new) META_WRITER.write(, buffer_encoder) buffer_encoder.write_boolean(true) self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder) end buffer_writer.string end |
#write_error(writers_schema, error_exception, encoder) ⇒ Object
372 373 374 375 |
# File 'lib/avro/ipc.rb', line 372 def write_error(writers_schema, error_exception, encoder) datum_writer = Avro::IO::DatumWriter.new(writers_schema) datum_writer.write(error_exception.to_s, encoder) end |
#write_response(writers_schema, response_datum, encoder) ⇒ Object
367 368 369 370 |
# File 'lib/avro/ipc.rb', line 367 def write_response(writers_schema, response_datum, encoder) datum_writer = Avro::IO::DatumWriter.new(writers_schema) datum_writer.write(response_datum, encoder) end |