Class: Avro::IPC::SocketTransport
- Inherits:
-
Object
- Object
- Avro::IPC::SocketTransport
- Defined in:
- lib/avro/ipc.rb
Instance Attribute Summary collapse
-
#protocol ⇒ Object
Returns the value of attribute protocol.
-
#remote_name ⇒ Object
readonly
A simple socket-based Transport implementation.
-
#sock ⇒ Object
readonly
A simple socket-based Transport implementation.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(sock) ⇒ SocketTransport
constructor
A new instance of SocketTransport.
- #is_connected? ⇒ Boolean
- #read_buffer_length ⇒ Object
- #read_framed_message ⇒ Object
- #transceive(request) ⇒ Object
- #write_buffer(chunk) ⇒ Object
- #write_buffer_length(n) ⇒ Object
- #write_framed_message(message) ⇒ Object
Constructor Details
#initialize(sock) ⇒ SocketTransport
Returns a new instance of SocketTransport.
379 380 381 382 |
# File 'lib/avro/ipc.rb', line 379 def initialize(sock) @sock = sock @protocol = nil end |
Instance Attribute Details
#protocol ⇒ Object
Returns the value of attribute protocol.
377 378 379 |
# File 'lib/avro/ipc.rb', line 377 def protocol @protocol end |
#remote_name ⇒ Object (readonly)
A simple socket-based Transport implementation.
376 377 378 |
# File 'lib/avro/ipc.rb', line 376 def remote_name @remote_name end |
#sock ⇒ Object (readonly)
A simple socket-based Transport implementation.
376 377 378 |
# File 'lib/avro/ipc.rb', line 376 def sock @sock end |
Instance Method Details
#close ⇒ Object
456 457 458 |
# File 'lib/avro/ipc.rb', line 456 def close sock.close end |
#is_connected? ⇒ Boolean
384 385 386 |
# File 'lib/avro/ipc.rb', line 384 def is_connected?() !!@protocol end |
#read_buffer_length ⇒ Object
448 449 450 451 452 453 454 |
# File 'lib/avro/ipc.rb', line 448 def read_buffer_length read = sock.read(BUFFER_HEADER_LENGTH) if read == '' || read == nil raise ConnectionClosedException.new("Socket read 0 bytes.") end read.unpack('N')[0] end |
#read_framed_message ⇒ Object
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 |
# File 'lib/avro/ipc.rb', line 393 def = [] loop do buffer = StringIO.new buffer_length = read_buffer_length if buffer_length == 0 return .join end while buffer.tell < buffer_length chunk = sock.read(buffer_length - buffer.tell) if chunk == '' raise ConnectionClosedException.new("Socket read 0 bytes.") end buffer.write(chunk) end << buffer.string end end |
#transceive(request) ⇒ Object
388 389 390 391 |
# File 'lib/avro/ipc.rb', line 388 def transceive(request) (request) end |
#write_buffer(chunk) ⇒ Object
428 429 430 431 432 433 434 435 436 437 438 439 |
# File 'lib/avro/ipc.rb', line 428 def write_buffer(chunk) buffer_length = chunk.size write_buffer_length(buffer_length) total_bytes_sent = 0 while total_bytes_sent < buffer_length bytes_sent = self.sock.write(chunk[total_bytes_sent..-1]) if bytes_sent == 0 raise ConnectionClosedException.new("Socket sent 0 bytes.") end total_bytes_sent += bytes_sent end end |
#write_buffer_length(n) ⇒ Object
441 442 443 444 445 446 |
# File 'lib/avro/ipc.rb', line 441 def write_buffer_length(n) bytes_sent = sock.write([n].pack('N')) if bytes_sent == 0 raise ConnectionClosedException.new("socket sent 0 bytes") end end |
#write_framed_message(message) ⇒ Object
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 |
# File 'lib/avro/ipc.rb', line 412 def () = .size total_bytes_sent = 0 while - total_bytes_sent > 0 if - total_bytes_sent > BUFFER_SIZE buffer_length = BUFFER_SIZE else buffer_length = - total_bytes_sent end write_buffer([total_bytes_sent,buffer_length]) total_bytes_sent += buffer_length end # A message is always terminated by a zero-length buffer. write_buffer_length(0) end |