Class: Oxblood::RSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/oxblood/rsocket.rb

Overview

Thin socket wrapper made with resilience. Socket will be closed and automatically recreated in case of any errors (including timeout errors) in order to avoid inconsistent state.

Constant Summary collapse

TimeoutError =
Class.new(RuntimeError)

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ RSocket

Maintain socket

Parameters:

  • opts (Hash) (defaults to: {})

    Connection options

Options Hash (opts):

  • :timeout (Float) — default: 1.0

    socket read/write timeout

  • :host (String) — default: 'localhost'

    Hostname or IP address to connect to

  • :port (Integer) — default: 6379

    Port Redis server listens on

  • :connect_timeout (Float) — default: 1.0

    socket connect timeout

  • :path (String)

    UNIX socket path



36
37
38
39
40
41
# File 'lib/oxblood/rsocket.rb', line 36

def initialize(opts = {})
  @opts = opts
  @timeout = opts.fetch(:timeout, 1.0)
  @socket = create_socket(opts)
  @buffer = String.new.encode!('ASCII-8BIT')
end

Instance Attribute Details

#timeoutNumeric

Returns timeout in seconds.

Returns:

  • (Numeric)

    timeout in seconds



23
24
25
# File 'lib/oxblood/rsocket.rb', line 23

def timeout
  @timeout
end

Instance Method Details

#closenil

Close connection to server

Returns:

  • (nil)

    always return nil



88
89
90
91
92
93
94
95
# File 'lib/oxblood/rsocket.rb', line 88

def close
  @buffer.clear
  @socket && @socket.close
rescue IOError
  ;
ensure
  @socket = nil
end

#connected?Boolean

True if socket exists

Returns:

  • (Boolean)

    socket exists or not



99
100
101
# File 'lib/oxblood/rsocket.rb', line 99

def connected?
  !!@socket
end

#gets(separator, timeout = @timeout) ⇒ String

Read until separator

Parameters:

  • separator (String)

    separator

Returns:

  • (String)

    read result



59
60
61
62
63
64
65
# File 'lib/oxblood/rsocket.rb', line 59

def gets(separator, timeout = @timeout)
  while (crlf = @buffer.index(separator)).nil?
    @buffer << readpartial(1024, timeout)
  end

  @buffer.slice!(0, crlf + separator.bytesize)
end

#read(nbytes, timeout = @timeout) ⇒ String

Read number of bytes

Parameters:

  • nbytes (Integer)

    number of bytes to read

Returns:

  • (String)

    read result



46
47
48
49
50
51
52
53
54
# File 'lib/oxblood/rsocket.rb', line 46

def read(nbytes, timeout = @timeout)
  result = @buffer.slice!(0, nbytes)

  while result.bytesize < nbytes
    result << readpartial(nbytes - result.bytesize, timeout)
  end

  result
end

#write(data, timeout = @timeout) ⇒ Integer

Write data to socket

Parameters:

  • data (String)

    given

Returns:

  • (Integer)

    the number of bytes written



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/oxblood/rsocket.rb', line 70

def write(data, timeout = @timeout)
  full_size = data.bytesize

  while data.bytesize > 0
    written = socket.write_nonblock(data, exception: false)

    if written == :wait_writable
      socket.wait_writable(timeout) or fail_with_timeout!
    else
      data = data.byteslice(written..-1)
    end
  end

  full_size
end