Class: Avro::IPC::SocketTransport
- Inherits:
-
Object
- Object
- Avro::IPC::SocketTransport
- Defined in:
- lib/avro/ipc.rb
Instance Attribute Summary collapse
-
#protocol ⇒ Object
Returns the value of attribute protocol.
-
#remote_name ⇒ Object
readonly
A simple socket-based Transport implementation.
-
#sock ⇒ Object
readonly
A simple socket-based Transport implementation.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(sock) ⇒ SocketTransport
constructor
A new instance of SocketTransport.
- #is_connected? ⇒ Boolean
- #read_buffer_length ⇒ Object
- #read_framed_message ⇒ Object
- #transceive(request) ⇒ Object
- #write_buffer(chunk) ⇒ Object
- #write_buffer_length(n) ⇒ Object
- #write_framed_message(message) ⇒ Object
Constructor Details
#initialize(sock) ⇒ SocketTransport
Returns a new instance of SocketTransport.
384 385 386 387 |
# File 'lib/avro/ipc.rb', line 384 def initialize(sock) @sock = sock @protocol = nil end |
Instance Attribute Details
#protocol ⇒ Object
Returns the value of attribute protocol.
382 383 384 |
# File 'lib/avro/ipc.rb', line 382 def protocol @protocol end |
#remote_name ⇒ Object (readonly)
A simple socket-based Transport implementation.
381 382 383 |
# File 'lib/avro/ipc.rb', line 381 def remote_name @remote_name end |
#sock ⇒ Object (readonly)
A simple socket-based Transport implementation.
381 382 383 |
# File 'lib/avro/ipc.rb', line 381 def sock @sock end |
Instance Method Details
#close ⇒ Object
461 462 463 |
# File 'lib/avro/ipc.rb', line 461 def close sock.close end |
#is_connected? ⇒ Boolean
389 390 391 |
# File 'lib/avro/ipc.rb', line 389 def is_connected?() !!@protocol end |
#read_buffer_length ⇒ Object
453 454 455 456 457 458 459 |
# File 'lib/avro/ipc.rb', line 453 def read_buffer_length read = sock.read(BUFFER_HEADER_LENGTH) if read == '' || read == nil raise ConnectionClosedException.new("Socket read 0 bytes.") end read.unpack('N')[0] end |
#read_framed_message ⇒ Object
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 |
# File 'lib/avro/ipc.rb', line 398 def = [] loop do buffer = StringIO.new(String.new('', encoding: 'BINARY')) buffer_length = read_buffer_length if buffer_length == 0 return .join end while buffer.tell < buffer_length chunk = sock.read(buffer_length - buffer.tell) if chunk == '' raise ConnectionClosedException.new("Socket read 0 bytes.") end buffer.write(chunk) end << buffer.string end end |
#transceive(request) ⇒ Object
393 394 395 396 |
# File 'lib/avro/ipc.rb', line 393 def transceive(request) (request) end |
#write_buffer(chunk) ⇒ Object
433 434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/avro/ipc.rb', line 433 def write_buffer(chunk) buffer_length = chunk.bytesize write_buffer_length(buffer_length) total_bytes_sent = 0 while total_bytes_sent < buffer_length bytes_sent = self.sock.write(chunk[total_bytes_sent..-1]) if bytes_sent == 0 raise ConnectionClosedException.new("Socket sent 0 bytes.") end total_bytes_sent += bytes_sent end end |
#write_buffer_length(n) ⇒ Object
446 447 448 449 450 451 |
# File 'lib/avro/ipc.rb', line 446 def write_buffer_length(n) bytes_sent = sock.write([n].pack('N')) if bytes_sent == 0 raise ConnectionClosedException.new("socket sent 0 bytes") end end |
#write_framed_message(message) ⇒ Object
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/avro/ipc.rb', line 417 def () = .bytesize total_bytes_sent = 0 while - total_bytes_sent > 0 if - total_bytes_sent > BUFFER_SIZE buffer_length = BUFFER_SIZE else buffer_length = - total_bytes_sent end write_buffer([total_bytes_sent,buffer_length]) total_bytes_sent += buffer_length end # A message is always terminated by a zero-length buffer. write_buffer_length(0) end |