Class: Avro::IPC::SocketTransport

Inherits:
Object
  • Object
show all
Defined in:
lib/avro/ipc.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sock) ⇒ SocketTransport

Returns a new instance of SocketTransport.



384
385
386
387
# File 'lib/avro/ipc.rb', line 384

def initialize(sock)
  @sock = sock
  @protocol = nil
end

Instance Attribute Details

#protocolObject

Returns the value of attribute protocol.



382
383
384
# File 'lib/avro/ipc.rb', line 382

def protocol
  @protocol
end

#remote_nameObject (readonly)

A simple socket-based Transport implementation.



381
382
383
# File 'lib/avro/ipc.rb', line 381

def remote_name
  @remote_name
end

#sockObject (readonly)

A simple socket-based Transport implementation.



381
382
383
# File 'lib/avro/ipc.rb', line 381

def sock
  @sock
end

Instance Method Details

#closeObject



461
462
463
# File 'lib/avro/ipc.rb', line 461

def close
  sock.close
end

#is_connected?Boolean

Returns:

  • (Boolean)


389
390
391
# File 'lib/avro/ipc.rb', line 389

def is_connected?()
  !!@protocol
end

#read_buffer_lengthObject



453
454
455
456
457
458
459
# File 'lib/avro/ipc.rb', line 453

def read_buffer_length
  read = sock.read(BUFFER_HEADER_LENGTH)
  if read == '' || read == nil
    raise ConnectionClosedException.new("Socket read 0 bytes.")
  end
  read.unpack('N')[0]
end

#read_framed_messageObject



398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
# File 'lib/avro/ipc.rb', line 398

def read_framed_message
  message = []
  loop do
    buffer = StringIO.new(String.new('', encoding: 'BINARY'))
    buffer_length = read_buffer_length
    if buffer_length == 0
      return message.join
    end
    while buffer.tell < buffer_length
      chunk = sock.read(buffer_length - buffer.tell)
      if chunk == ''
        raise ConnectionClosedException.new("Socket read 0 bytes.")
      end
      buffer.write(chunk)
    end
    message << buffer.string
  end
end

#transceive(request) ⇒ Object



393
394
395
396
# File 'lib/avro/ipc.rb', line 393

def transceive(request)
  write_framed_message(request)
  read_framed_message
end

#write_buffer(chunk) ⇒ Object



433
434
435
436
437
438
439
440
441
442
443
444
# File 'lib/avro/ipc.rb', line 433

def write_buffer(chunk)
  buffer_length = chunk.bytesize
  write_buffer_length(buffer_length)
  total_bytes_sent = 0
  while total_bytes_sent < buffer_length
    bytes_sent = self.sock.write(chunk[total_bytes_sent..-1])
    if bytes_sent == 0
      raise ConnectionClosedException.new("Socket sent 0 bytes.")
    end
    total_bytes_sent += bytes_sent
  end
end

#write_buffer_length(n) ⇒ Object



446
447
448
449
450
451
# File 'lib/avro/ipc.rb', line 446

def write_buffer_length(n)
  bytes_sent = sock.write([n].pack('N'))
  if bytes_sent == 0
    raise ConnectionClosedException.new("socket sent 0 bytes")
  end
end

#write_framed_message(message) ⇒ Object



417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# File 'lib/avro/ipc.rb', line 417

def write_framed_message(message)
  message_length = message.bytesize
  total_bytes_sent = 0
  while message_length - total_bytes_sent > 0
    if message_length - total_bytes_sent > BUFFER_SIZE
      buffer_length = BUFFER_SIZE
    else
      buffer_length = message_length - total_bytes_sent
    end
    write_buffer(message[total_bytes_sent,buffer_length])
    total_bytes_sent += buffer_length
  end
  # A message is always terminated by a zero-length buffer.
  write_buffer_length(0)
end