Class: Avro::IPC::SocketTransport
- Inherits:
-
Object
- Object
- Avro::IPC::SocketTransport
- Defined in:
- lib/avro/ipc.rb
Instance Attribute Summary collapse
-
#protocol ⇒ Object
Returns the value of attribute protocol.
-
#remote_name ⇒ Object
readonly
A simple socket-based Transport implementation.
-
#sock ⇒ Object
readonly
A simple socket-based Transport implementation.
Instance Method Summary collapse
- #close ⇒ Object
-
#initialize(sock) ⇒ SocketTransport
constructor
A new instance of SocketTransport.
- #is_connected? ⇒ Boolean
- #read_buffer_length ⇒ Object
- #read_framed_message ⇒ Object
- #transceive(request) ⇒ Object
- #write_buffer(chunk) ⇒ Object
- #write_buffer_length(n) ⇒ Object
- #write_framed_message(message) ⇒ Object
Constructor Details
#initialize(sock) ⇒ SocketTransport
Returns a new instance of SocketTransport.
380 381 382 383 |
# File 'lib/avro/ipc.rb', line 380 def initialize(sock) @sock = sock @protocol = nil end |
Instance Attribute Details
#protocol ⇒ Object
Returns the value of attribute protocol.
378 379 380 |
# File 'lib/avro/ipc.rb', line 378 def protocol @protocol end |
#remote_name ⇒ Object (readonly)
A simple socket-based Transport implementation.
377 378 379 |
# File 'lib/avro/ipc.rb', line 377 def remote_name @remote_name end |
#sock ⇒ Object (readonly)
A simple socket-based Transport implementation.
377 378 379 |
# File 'lib/avro/ipc.rb', line 377 def sock @sock end |
Instance Method Details
#close ⇒ Object
457 458 459 |
# File 'lib/avro/ipc.rb', line 457 def close sock.close end |
#is_connected? ⇒ Boolean
385 386 387 |
# File 'lib/avro/ipc.rb', line 385 def is_connected?() !!@protocol end |
#read_buffer_length ⇒ Object
449 450 451 452 453 454 455 |
# File 'lib/avro/ipc.rb', line 449 def read_buffer_length read = sock.read(BUFFER_HEADER_LENGTH) if read == '' || read == nil raise ConnectionClosedException.new("Socket read 0 bytes.") end read.unpack('N')[0] end |
#read_framed_message ⇒ Object
394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 |
# File 'lib/avro/ipc.rb', line 394 def = [] loop do buffer = StringIO.new(''.force_encoding('BINARY')) buffer_length = read_buffer_length if buffer_length == 0 return .join end while buffer.tell < buffer_length chunk = sock.read(buffer_length - buffer.tell) if chunk == '' raise ConnectionClosedException.new("Socket read 0 bytes.") end buffer.write(chunk) end << buffer.string end end |
#transceive(request) ⇒ Object
389 390 391 392 |
# File 'lib/avro/ipc.rb', line 389 def transceive(request) (request) end |
#write_buffer(chunk) ⇒ Object
429 430 431 432 433 434 435 436 437 438 439 440 |
# File 'lib/avro/ipc.rb', line 429 def write_buffer(chunk) buffer_length = chunk.bytesize write_buffer_length(buffer_length) total_bytes_sent = 0 while total_bytes_sent < buffer_length bytes_sent = self.sock.write(chunk[total_bytes_sent..-1]) if bytes_sent == 0 raise ConnectionClosedException.new("Socket sent 0 bytes.") end total_bytes_sent += bytes_sent end end |
#write_buffer_length(n) ⇒ Object
442 443 444 445 446 447 |
# File 'lib/avro/ipc.rb', line 442 def write_buffer_length(n) bytes_sent = sock.write([n].pack('N')) if bytes_sent == 0 raise ConnectionClosedException.new("socket sent 0 bytes") end end |
#write_framed_message(message) ⇒ Object
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 |
# File 'lib/avro/ipc.rb', line 413 def () = .bytesize total_bytes_sent = 0 while - total_bytes_sent > 0 if - total_bytes_sent > BUFFER_SIZE buffer_length = BUFFER_SIZE else buffer_length = - total_bytes_sent end write_buffer([total_bytes_sent,buffer_length]) total_bytes_sent += buffer_length end # A message is always terminated by a zero-length buffer. write_buffer_length(0) end |