Class: NATS::IO::Socket
- Inherits:
-
Object
- Object
- NATS::IO::Socket
- Defined in:
- lib/nats/io/client.rb
Overview
Implementation adapted from github.com/redis/redis-rb
Direct Known Subclasses
Instance Attribute Summary collapse
-
#socket ⇒ Object
Returns the value of attribute socket.
Instance Method Summary collapse
- #close ⇒ Object
- #closed? ⇒ Boolean
- #connect ⇒ Object
-
#initialize(options = {}) ⇒ Socket
constructor
A new instance of Socket.
- #read(max_bytes, deadline = nil) ⇒ Object
- #read_line(deadline = nil) ⇒ Object
-
#setup_tls! ⇒ Object
(Re-)connect using secure connection if server and client agreed on using it.
- #write(data, deadline = nil) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Socket
Returns a new instance of Socket.
1873 1874 1875 1876 1877 1878 1879 1880 |
# File 'lib/nats/io/client.rb', line 1873 def initialize(={}) @uri = [:uri] @connect_timeout = [:connect_timeout] @write_timeout = [:write_timeout] @read_timeout = [:read_timeout] @socket = nil @tls = [:tls] end |
Instance Attribute Details
#socket ⇒ Object
Returns the value of attribute socket.
1871 1872 1873 |
# File 'lib/nats/io/client.rb', line 1871 def socket @socket end |
Instance Method Details
#close ⇒ Object
1979 1980 1981 |
# File 'lib/nats/io/client.rb', line 1979 def close @socket.close end |
#closed? ⇒ Boolean
1983 1984 1985 |
# File 'lib/nats/io/client.rb', line 1983 def closed? @socket.closed? end |
#connect ⇒ Object
1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 |
# File 'lib/nats/io/client.rb', line 1882 def connect addrinfo = ::Socket.getaddrinfo(@uri.hostname, nil, ::Socket::AF_UNSPEC, ::Socket::SOCK_STREAM) addrinfo.each_with_index do |ai, i| begin @socket = connect_addrinfo(ai, @uri.port, @connect_timeout) break rescue SystemCallError => e # Give up if no more available raise e if addrinfo.length == i+1 end end # Set TCP no delay by default @socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1) end |
#read(max_bytes, deadline = nil) ⇒ Object
1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 |
# File 'lib/nats/io/client.rb', line 1922 def read(max_bytes, deadline=nil) begin return @socket.read_nonblock(max_bytes) rescue ::IO::WaitReadable if ::IO.select([@socket], nil, nil, deadline) retry else raise NATS::IO::SocketTimeoutError end rescue ::IO::WaitWritable if ::IO.select(nil, [@socket], nil, deadline) retry else raise NATS::IO::SocketTimeoutError end end rescue EOFError => e if RUBY_ENGINE == 'jruby' and e. == 'No message available' # FIXME: <EOFError: No message available> can happen in jruby # even though seems it is temporary and eventually possible # to read from socket. return nil end raise Errno::ECONNRESET end |
#read_line(deadline = nil) ⇒ Object
1914 1915 1916 1917 1918 1919 1920 |
# File 'lib/nats/io/client.rb', line 1914 def read_line(deadline=nil) # FIXME: Should accumulate and read in a non blocking way instead unless ::IO.select([@socket], nil, nil, deadline) raise NATS::IO::SocketTimeoutError end @socket.gets end |
#setup_tls! ⇒ Object
(Re-)connect using secure connection if server and client agreed on using it.
1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 |
# File 'lib/nats/io/client.rb', line 1899 def setup_tls! # Setup TLS connection by rewrapping the socket tls_socket = OpenSSL::SSL::SSLSocket.new(@socket, @tls.fetch(:context)) # Close TCP socket after closing TLS socket as well. tls_socket.sync_close = true # Required to enable hostname verification if Ruby runtime supports it (>= 2.4): # https://github.com/ruby/openssl/commit/028e495734e9e6aa5dba1a2e130b08f66cf31a21 tls_socket.hostname = @tls[:hostname] tls_socket.connect @socket = tls_socket end |
#write(data, deadline = nil) ⇒ Object
1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 |
# File 'lib/nats/io/client.rb', line 1949 def write(data, deadline=nil) length = data.bytesize total_written = 0 loop do begin written = @socket.write_nonblock(data) total_written += written break total_written if total_written >= length data = data.byteslice(written..-1) rescue ::IO::WaitWritable if ::IO.select(nil, [@socket], nil, deadline) retry else raise NATS::IO::SocketTimeoutError end rescue ::IO::WaitReadable if ::IO.select([@socket], nil, nil, deadline) retry else raise NATS::IO::SocketTimeoutError end end end rescue EOFError raise Errno::ECONNRESET end |