Class: Riak::Client::BeefcakeProtobuffsBackend::Protocol

Inherits:
Object
  • Object
show all
Includes:
Util::Translation
Defined in:
lib/riak/client/beefcake/protocol.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(socket, options = {}) ⇒ Protocol

Returns a new instance of Protocol.

Parameters:

  • (Socket)
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :read_timeout (Numeric) — default: nil

    The read timeout, in seconds

  • :write_timeout (Numeric) — default: nil

    The write timeout, in seconds



17
18
19
20
21
# File 'lib/riak/client/beefcake/protocol.rb', line 17

def initialize(socket, options = {})
  @socket = socket
  @read_timeout = options[:read_timeout]
  @write_timeout = options[:write_timeout]
end

Instance Attribute Details

#read_timeoutObject (readonly)

Returns the value of attribute read_timeout.



11
12
13
# File 'lib/riak/client/beefcake/protocol.rb', line 11

def read_timeout
  @read_timeout
end

#socketObject (readonly)

Returns the value of attribute socket.



11
12
13
# File 'lib/riak/client/beefcake/protocol.rb', line 11

def socket
  @socket
end

#write_timeoutObject (readonly)

Returns the value of attribute write_timeout.



11
12
13
# File 'lib/riak/client/beefcake/protocol.rb', line 11

def write_timeout
  @write_timeout
end

Instance Method Details

#expect(code, decoder_class = nil, options = { }) ⇒ Beefcake::Message, :empty

Receives a Riak-formatted message, checks the symbolic name against the given code, decodes it if it matches, and can optionally return success if the payload is empty.

Parameters:

  • code (Symbol)

    the code for the message

  • decoder_class (Class, nil) (defaults to: nil)

    the class to attempt to decode the payload with

  • options (Hash) (defaults to: { })

Options Hash (options):

  • :empty_body_acceptable (Boolean)

    Whether to accept an empty body and not attempt decoding. In this case, this method will return the symbol ‘:empty` instead of a `Beefcake::Message` instance

Returns:

  • (Beefcake::Message, :empty)

Raises:



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/riak/client/beefcake/protocol.rb', line 101

def expect(code, decoder_class = nil, options = { })
  code = BeefcakeMessageCodes[code] unless code.is_a? Symbol
  name, body = receive

  if name == :ErrorResp
    raise ProtobuffsErrorResponse.new RpbErrorResp.decode(body)
  end

  if name != code
    raise ProtobuffsUnexpectedResponse.new name, code
  end

  return true if decoder_class.nil?

  return :empty if body.nil? && options[:empty_body_acceptable]

  return decoder_class.decode body
end

#receiveArray<Symbol, String>

Receives a Riak-formatted message, and returns the symbolic name of the message along with the string payload from the network.

Returns:

Raises:



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/riak/client/beefcake/protocol.rb', line 67

def receive
  if read_timeout && !IO.select([socket], nil, nil, read_timeout)
    raise Errno::ETIMEDOUT, 'read timeout'
  end
  header = socket.read 5

  raise ProtobuffsFailedHeader.new if header.nil?
  message_length, code = header.unpack 'NC'
  body_length = message_length - 1
  body = nil
  body = socket.read body_length if body_length > 0

  name = BeefcakeMessageCodes[code]

  return name, body
end

#write(code, message = nil) ⇒ Object

Encodes and writes a Riak-formatted message, including protocol buffer payload if given.

Parameters:

  • code (Symbol, Integer)

    the symbolic or numeric code for the message

  • message (Beefcake::Message, nil) (defaults to: nil)

    the protocol buffer message payload, or nil if the message carries no payload



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/riak/client/beefcake/protocol.rb', line 30

def write(code, message = nil)
  if code.is_a? Symbol
    code = BeefcakeMessageCodes.index code
  end

  serialized = serialize message

  header = [serialized.length + 1, code].pack 'NC'

  payload = header + serialized

  if write_timeout
    begin
      loop do
        bytes_written = socket.write_nonblock(payload)
        # write_nonblock doesn't guarantee to write all data at once,
        # so check if there are bytes left to be written
        break if bytes_written >= payload.bytesize
        payload.slice!(0, bytes_written)
      end
    rescue IO::WaitWritable, Errno::EINTR
      # wait with the retry until socket is writable again
      unless IO.select(nil, [socket], nil, write_timeout)
        raise Errno::ETIMEDOUT, 'write timeout'
      end
      retry
    end
  else
    socket.write(payload)
  end
  socket.flush
end