Class: Cosmos::TcpipSocketStream

Inherits:
Stream show all
Defined in:
lib/cosmos/streams/tcpip_socket_stream.rb

Overview

Data Stream which reads and writes from Tcpip Sockets.

Direct Known Subclasses

TcpipClientStream

Instance Attribute Summary collapse

Attributes inherited from Stream

#raw_logger_pair

Instance Method Summary collapse

Constructor Details

#initialize(write_socket, read_socket, write_timeout, read_timeout) ⇒ TcpipSocketStream



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 29

def initialize(write_socket, read_socket, write_timeout, read_timeout)
  super()

  @write_socket  = write_socket
  @read_socket   = read_socket
  @write_timeout = ConfigParser.handle_nil(write_timeout)
  @write_timeout = @write_timeout.to_f if @write_timeout
  @read_timeout  = ConfigParser.handle_nil(read_timeout)
  @read_timeout  = @read_timeout.to_f if @read_timeout
  @connected = true

  # Mutex on write is needed to protect from commands coming in from more
  # than one tool
  @write_mutex = Mutex.new
end

Instance Attribute Details

#write_socketObject (readonly)

Returns the value of attribute write_socket.



21
22
23
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 21

def write_socket
  @write_socket
end

Instance Method Details

#connected?Boolean



124
125
126
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 124

def connected?
  @connected
end

#disconnectObject

Disconnect by closing the sockets



129
130
131
132
133
134
135
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 129

def disconnect
  if @connected
    @write_socket.close if @write_socket and !@write_socket.closed?
    @read_socket.close if @read_socket and !@read_socket.closed?
    @connected = false
  end
end

#readString



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 46

def read
  raise "Attempt to read from write only stream" unless @read_socket

  # No read mutex is needed because there is only one stream procesor
  # reading
  begin
    data = @read_socket.recv_nonblock(65535)
    @raw_logger_pair.read_logger.write(data) if @raw_logger_pair
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK
    # Wait for the socket to be ready for reading or for the timeout
    begin
      result = IO.fast_select([@read_socket], nil, nil, @read_timeout)

      # If select returns something it means the socket is now available for
      # reading so retry the read. If it returns nil it means we timed out.
      if result
        retry
      else
        raise Timeout::Error, "Read Timeout"
      end
    rescue IOError, Errno::ENOTSOCK
      # These can happen with the socket being closed while waiting on select
      data = ''
    end
  rescue Errno::ECONNRESET, Errno::ECONNABORTED
    data = ''
  end

  data
end

#read_nonblockString



78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 78

def read_nonblock
  # No read mutex is needed because there is only one stream procesor
  # reading
  begin
    data = @read_socket.recv_nonblock(65535)
    @raw_logger_pair.read_logger.write(data) if @raw_logger_pair
  rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::ECONNRESET, Errno::ECONNABORTED
    data = ''
  end

  data
end

#write(data) ⇒ Object



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/cosmos/streams/tcpip_socket_stream.rb', line 92

def write(data)
  raise "Attempt to write to read only stream" unless @write_socket

  @write_mutex.synchronize do
    num_bytes_to_send = data.length
    total_bytes_sent = 0
    bytes_sent = 0
    data_to_send = data

    loop do
      begin
        bytes_sent = @write_socket.write_nonblock(data_to_send)
        @raw_logger_pair.write_logger.write(data_to_send[0..(bytes_sent - 1)]) if @raw_logger_pair and bytes_sent > 0
      rescue Errno::EAGAIN, Errno::EWOULDBLOCK
        # Wait for the socket to be ready for writing or for the timeout
        result = IO.fast_select(nil, [@write_socket], nil, @write_timeout)
        # If select returns something it means the socket is now available for
        # writing so retry the write. If it returns nil it means we timed out.
        if result
          retry
        else
          raise Timeout::Error, "Write Timeout"
        end
      end
      total_bytes_sent += bytes_sent
      break if total_bytes_sent >= num_bytes_to_send
      data_to_send = data[total_bytes_sent..-1]
    end
  end
end