Class: Avro::IPC::Responder

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/ipc.rb

Overview

Base class for the server side of a protocol interaction.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(local_protocol) ⇒ Responder

Returns a new instance of Responder.



240
241
242
243
244
245
# File 'lib/avro/ipc.rb', line 240

def initialize(local_protocol)
  @local_protocol = local_protocol
  @local_hash = self.local_protocol.md5
  @protocol_cache = {}
  protocol_cache[local_hash] = local_protocol
end

Instance Attribute Details

#local_hashObject (readonly)

Returns the value of attribute local_hash.



239
240
241
# File 'lib/avro/ipc.rb', line 239

def local_hash
  @local_hash
end

#local_protocolObject (readonly)

Returns the value of attribute local_protocol.



239
240
241
# File 'lib/avro/ipc.rb', line 239

def local_protocol
  @local_protocol
end

#protocol_cacheObject (readonly)

Returns the value of attribute protocol_cache.



239
240
241
# File 'lib/avro/ipc.rb', line 239

def protocol_cache
  @protocol_cache
end

Instance Method Details

#call(_local_message, _request) ⇒ Object

Raises:

  • (NotImplementedError)


357
358
359
360
# File 'lib/avro/ipc.rb', line 357

def call(_local_message, _request)
  # Actual work done by server: cf. handler in thrift.
  raise NotImplementedError
end

#process_handshake(decoder, encoder, connection = nil) ⇒ Object



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
# File 'lib/avro/ipc.rb', line 310

def process_handshake(decoder, encoder, connection=nil)
  if connection && connection.is_connected?
    return connection.protocol
  end
  handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
  handshake_response = {}

  # determine the remote protocol
  client_hash = handshake_request['clientHash']
  client_protocol = handshake_request['clientProtocol']
  remote_protocol = protocol_cache[client_hash]

  if !remote_protocol && client_protocol
    remote_protocol = Avro::Protocol.parse(client_protocol)
    protocol_cache[client_hash] = remote_protocol
  end

  # evaluate remote's guess of the local protocol
  server_hash = handshake_request['serverHash']
  if local_hash == server_hash
    if !remote_protocol
      handshake_response['match'] = 'NONE'
    else
      handshake_response['match'] = 'BOTH'
    end
  else
    if !remote_protocol
      handshake_response['match'] = 'NONE'
    else
      handshake_response['match'] = 'CLIENT'
    end
  end

  if handshake_response['match'] != 'BOTH'
    handshake_response['serverProtocol'] = local_protocol.to_s
    handshake_response['serverHash'] = local_hash
  end

  HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)

  if connection && handshake_response['match'] != 'NONE'
    connection.protocol = remote_protocol
  end

  remote_protocol
end

#read_request(writers_schema, readers_schema, decoder) ⇒ Object



362
363
364
365
# File 'lib/avro/ipc.rb', line 362

def read_request(writers_schema, readers_schema, decoder)
  datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
  datum_reader.read(decoder)
end

#respond(call_request, transport = nil) ⇒ Object

Called by a server to deserialize a request, compute and serialize a response or error. Compare to ‘handle()’ in Thrift.



249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/avro/ipc.rb', line 249

def respond(call_request, transport=nil)
  buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
  buffer_writer = StringIO.new(String.new('', encoding: 'BINARY'))
  buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
  error = nil
   = {}

  begin
    remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport)
    # handshake failure
    unless remote_protocol
      return buffer_writer.string
    end

    # read request using remote protocol
     = META_READER.read(buffer_decoder)
    remote_message_name = buffer_decoder.read_string

    # get remote and local request schemas so we can do
    # schema resolution (one fine day)
    remote_message = remote_protocol.messages[remote_message_name]
    unless remote_message
      raise AvroError.new("Unknown remote message: #{remote_message_name}")
    end
    local_message = local_protocol.messages[remote_message_name]
    unless local_message
      raise AvroError.new("Unknown local message: #{remote_message_name}")
    end
    writers_schema = remote_message.request
    readers_schema = local_message.request
    request = read_request(writers_schema, readers_schema, buffer_decoder)
    # perform server logic
    begin
      response = call(local_message, request)
    rescue AvroRemoteError => e
      error = e
    rescue Exception => e # rubocop:disable Lint/RescueException
      error = AvroRemoteError.new(e.to_s)
    end

    # write response using local protocol
    META_WRITER.write(, buffer_encoder)
    buffer_encoder.write_boolean(!!error)
    if error.nil?
      writers_schema = local_message.response
      write_response(writers_schema, response, buffer_encoder)
    else
      writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA
      write_error(writers_schema, error, buffer_encoder)
    end
  rescue Avro::AvroError => e
    error = AvroRemoteException.new(e.to_s)
    # TODO does the stuff written here ever get used?
    buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new)
    META_WRITER.write(, buffer_encoder)
    buffer_encoder.write_boolean(true)
    self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
  end
  buffer_writer.string
end

#write_error(writers_schema, error_exception, encoder) ⇒ Object



372
373
374
375
# File 'lib/avro/ipc.rb', line 372

def write_error(writers_schema, error_exception, encoder)
  datum_writer = Avro::IO::DatumWriter.new(writers_schema)
  datum_writer.write(error_exception.to_s, encoder)
end

#write_response(writers_schema, response_datum, encoder) ⇒ Object



367
368
369
370
# File 'lib/avro/ipc.rb', line 367

def write_response(writers_schema, response_datum, encoder)
  datum_writer = Avro::IO::DatumWriter.new(writers_schema)
  datum_writer.write(response_datum, encoder)
end