Class: Avro::IPC::Responder
- Inherits:
-
Object
- Object
- Avro::IPC::Responder
- Defined in:
- lib/avro/ipc.rb
Overview
Base class for the server side of a protocol interaction.
Instance Attribute Summary collapse
-
#local_hash ⇒ Object
readonly
Returns the value of attribute local_hash.
-
#local_protocol ⇒ Object
readonly
Returns the value of attribute local_protocol.
-
#protocol_cache ⇒ Object
readonly
Returns the value of attribute protocol_cache.
Instance Method Summary collapse
- #call(local_message, request) ⇒ Object
-
#initialize(local_protocol) ⇒ Responder
constructor
A new instance of Responder.
- #process_handshake(decoder, encoder, connection = nil) ⇒ Object
- #read_request(writers_schema, readers_schema, decoder) ⇒ Object
-
#respond(call_request, transport = nil) ⇒ Object
Called by a server to deserialize a request, compute and serialize a response or error.
- #write_error(writers_schema, error_exception, encoder) ⇒ Object
- #write_response(writers_schema, response_datum, encoder) ⇒ Object
Constructor Details
#initialize(local_protocol) ⇒ Responder
Returns a new instance of Responder.
236 237 238 239 240 241 |
# File 'lib/avro/ipc.rb', line 236 def initialize(local_protocol) @local_protocol = local_protocol @local_hash = self.local_protocol.md5 @protocol_cache = {} protocol_cache[local_hash] = local_protocol end |
Instance Attribute Details
#local_hash ⇒ Object (readonly)
Returns the value of attribute local_hash.
235 236 237 |
# File 'lib/avro/ipc.rb', line 235 def local_hash @local_hash end |
#local_protocol ⇒ Object (readonly)
Returns the value of attribute local_protocol.
235 236 237 |
# File 'lib/avro/ipc.rb', line 235 def local_protocol @local_protocol end |
#protocol_cache ⇒ Object (readonly)
Returns the value of attribute protocol_cache.
235 236 237 |
# File 'lib/avro/ipc.rb', line 235 def protocol_cache @protocol_cache end |
Instance Method Details
#call(local_message, request) ⇒ Object
353 354 355 356 |
# File 'lib/avro/ipc.rb', line 353 def call(, request) # Actual work done by server: cf. handler in thrift. raise NotImplementedError end |
#process_handshake(decoder, encoder, connection = nil) ⇒ Object
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/avro/ipc.rb', line 306 def process_handshake(decoder, encoder, connection=nil) if connection && connection.is_connected? return connection.protocol end handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder) handshake_response = {} # determine the remote protocol client_hash = handshake_request['clientHash'] client_protocol = handshake_request['clientProtocol'] remote_protocol = protocol_cache[client_hash] if !remote_protocol && client_protocol remote_protocol = Avro::Protocol.parse(client_protocol) protocol_cache[client_hash] = remote_protocol end # evaluate remote's guess of the local protocol server_hash = handshake_request['serverHash'] if local_hash == server_hash if !remote_protocol handshake_response['match'] = 'NONE' else handshake_response['match'] = 'BOTH' end else if !remote_protocol handshake_response['match'] = 'NONE' else handshake_response['match'] = 'CLIENT' end end if handshake_response['match'] != 'BOTH' handshake_response['serverProtocol'] = local_protocol.to_s handshake_response['serverHash'] = local_hash end HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder) if connection && handshake_response['match'] != 'NONE' connection.protocol = remote_protocol end remote_protocol end |
#read_request(writers_schema, readers_schema, decoder) ⇒ Object
358 359 360 361 |
# File 'lib/avro/ipc.rb', line 358 def read_request(writers_schema, readers_schema, decoder) datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) datum_reader.read(decoder) end |
#respond(call_request, transport = nil) ⇒ Object
Called by a server to deserialize a request, compute and serialize a response or error. Compare to ‘handle()’ in Thrift.
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/avro/ipc.rb', line 245 def respond(call_request, transport=nil) buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request)) buffer_writer = StringIO.new(''.force_encoding('BINARY')) buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) error = nil = {} begin remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport) # handshake failure unless remote_protocol return buffer_writer.string end # read request using remote protocol = META_READER.read(buffer_decoder) = buffer_decoder.read_string # get remote and local request schemas so we can do # schema resolution (one fine day) = remote_protocol.[] unless raise AvroError.new("Unknown remote message: #{}") end = local_protocol.[] unless raise AvroError.new("Unknown local message: #{}") end writers_schema = .request readers_schema = .request request = read_request(writers_schema, readers_schema, buffer_decoder) # perform server logic begin response = call(, request) rescue AvroRemoteError => e error = e rescue Exception => e error = AvroRemoteError.new(e.to_s) end # write response using local protocol META_WRITER.write(, buffer_encoder) buffer_encoder.write_boolean(!!error) if error.nil? writers_schema = .response write_response(writers_schema, response, buffer_encoder) else writers_schema = .errors || SYSTEM_ERROR_SCHEMA write_error(writers_schema, error, buffer_encoder) end rescue Avro::AvroError => e error = AvroRemoteException.new(e.to_s) # TODO does the stuff written here ever get used? buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new) META_WRITER.write(, buffer_encoder) buffer_encoder.write_boolean(true) self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder) end buffer_writer.string end |
#write_error(writers_schema, error_exception, encoder) ⇒ Object
368 369 370 371 |
# File 'lib/avro/ipc.rb', line 368 def write_error(writers_schema, error_exception, encoder) datum_writer = Avro::IO::DatumWriter.new(writers_schema) datum_writer.write(error_exception.to_s, encoder) end |
#write_response(writers_schema, response_datum, encoder) ⇒ Object
363 364 365 366 |
# File 'lib/avro/ipc.rb', line 363 def write_response(writers_schema, response_datum, encoder) datum_writer = Avro::IO::DatumWriter.new(writers_schema) datum_writer.write(response_datum, encoder) end |