Class: Riak::Client::BeefcakeProtobuffsBackend::Protocol

Inherits:
Object
  • Object
show all
Includes:
Util::Translation
Defined in:
lib/riak/client/beefcake/protocol.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(socket, options = {}) ⇒ Protocol

Returns a new instance of Protocol.

Parameters:

  • (Socket)
  • options (Hash) (defaults to: {})

Options Hash (options):

  • :read_timeout (Numeric) — default: nil

    The read timeout, in seconds

  • :write_timeout (Numeric) — default: nil

    The write timeout, in seconds



31
32
33
34
35
# File 'lib/riak/client/beefcake/protocol.rb', line 31

def initialize(socket, options = {})
  @socket = socket
  @read_timeout = options[:read_timeout]
  @write_timeout = options[:write_timeout]
end

Instance Attribute Details

#read_timeoutObject (readonly)

Returns the value of attribute read_timeout.



25
26
27
# File 'lib/riak/client/beefcake/protocol.rb', line 25

def read_timeout
  @read_timeout
end

#socketObject (readonly)

Returns the value of attribute socket.



25
26
27
# File 'lib/riak/client/beefcake/protocol.rb', line 25

def socket
  @socket
end

#write_timeoutObject (readonly)

Returns the value of attribute write_timeout.



25
26
27
# File 'lib/riak/client/beefcake/protocol.rb', line 25

def write_timeout
  @write_timeout
end

Instance Method Details

#expect(code, decoder_class = nil, options = { }) ⇒ Beefcake::Message, :empty

Receives a Riak-formatted message, checks the symbolic name against the given code, decodes it if it matches, and can optionally return success if the payload is empty.

Parameters:

  • code (Symbol)

    the code for the message

  • decoder_class (Class, nil) (defaults to: nil)

    the class to attempt to decode the payload with

  • options (Hash) (defaults to: { })

Options Hash (options):

  • :empty_body_acceptable (Boolean)

    Whether to accept an empty body and not attempt decoding. In this case, this method will return the symbol ‘:empty` instead of a `Beefcake::Message` instance

Returns:

  • (Beefcake::Message, :empty)

Raises:



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/riak/client/beefcake/protocol.rb', line 115

def expect(code, decoder_class = nil, options = { })
  code = BeefcakeMessageCodes[code] unless code.is_a? Symbol
  name, body = receive

  if name == :ErrorResp
    raise ProtobuffsErrorResponse.new RpbErrorResp.decode(body)
  end

  if name != code
    raise ProtobuffsUnexpectedResponse.new name, code
  end

  return true if decoder_class.nil?

  return :empty if body.nil? && options[:empty_body_acceptable]

  return decoder_class.decode body
end

#receiveArray<Symbol, String>

Receives a Riak-formatted message, and returns the symbolic name of the message along with the string payload from the network.

Returns:

Raises:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/riak/client/beefcake/protocol.rb', line 81

def receive
  if read_timeout && !IO.select([socket], nil, nil, read_timeout)
    raise Errno::ETIMEDOUT, 'read timeout'
  end
  header = socket.read 5

  raise ProtobuffsFailedHeader.new if header.nil?
  message_length, code = header.unpack 'NC'
  body_length = message_length - 1
  body = nil
  body = socket.read body_length if body_length > 0

  name = BeefcakeMessageCodes[code]

  return name, body
end

#write(code, message = nil) ⇒ Object

Encodes and writes a Riak-formatted message, including protocol buffer payload if given.

Parameters:

  • code (Symbol, Integer)

    the symbolic or numeric code for the message

  • message (Beefcake::Message, nil) (defaults to: nil)

    the protocol buffer message payload, or nil if the message carries no payload



44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/riak/client/beefcake/protocol.rb', line 44

def write(code, message = nil)
  if code.is_a? Symbol
    code = BeefcakeMessageCodes.index code
  end

  serialized = serialize message

  header = [serialized.length + 1, code].pack 'NC'

  payload = header + serialized

  if write_timeout
    begin
      loop do
        bytes_written = socket.write_nonblock(payload)
        # write_nonblock doesn't guarantee to write all data at once,
        # so check if there are bytes left to be written
        break if bytes_written >= payload.bytesize
        payload.slice!(0, bytes_written)
      end
    rescue IO::WaitWritable, Errno::EINTR
      # wait with the retry until socket is writable again
      unless IO.select(nil, [socket], nil, write_timeout)
        raise Errno::ETIMEDOUT, 'write timeout'
      end
      retry
    end
  else
    socket.write(payload)
  end
  socket.flush
end