Class: Riak::Client::BeefcakeProtobuffsBackend::Protocol
- Includes:
- Util::Translation
- Defined in:
- lib/riak/client/beefcake/protocol.rb
Instance Attribute Summary collapse
-
#read_timeout ⇒ Object
readonly
Returns the value of attribute read_timeout.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#write_timeout ⇒ Object
readonly
Returns the value of attribute write_timeout.
Instance Method Summary collapse
-
#expect(code, decoder_class = nil, options = { }) ⇒ Beefcake::Message, :empty
Receives a Riak-formatted message, checks the symbolic name against the given code, decodes it if it matches, and can optionally return success if the payload is empty.
-
#initialize(socket, options = {}) ⇒ Protocol
constructor
A new instance of Protocol.
-
#receive ⇒ Array<Symbol, String>
Receives a Riak-formatted message, and returns the symbolic name of the message along with the string payload from the network.
-
#write(code, message = nil) ⇒ Object
Encodes and writes a Riak-formatted message, including protocol buffer payload if given.
Methods included from Util::Translation
Constructor Details
#initialize(socket, options = {}) ⇒ Protocol
Returns a new instance of Protocol.
17 18 19 20 21 |
# File 'lib/riak/client/beefcake/protocol.rb', line 17 def initialize(socket, = {}) @socket = socket @read_timeout = [:read_timeout] @write_timeout = [:write_timeout] end |
Instance Attribute Details
#read_timeout ⇒ Object (readonly)
Returns the value of attribute read_timeout.
11 12 13 |
# File 'lib/riak/client/beefcake/protocol.rb', line 11 def read_timeout @read_timeout end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
11 12 13 |
# File 'lib/riak/client/beefcake/protocol.rb', line 11 def socket @socket end |
#write_timeout ⇒ Object (readonly)
Returns the value of attribute write_timeout.
11 12 13 |
# File 'lib/riak/client/beefcake/protocol.rb', line 11 def write_timeout @write_timeout end |
Instance Method Details
#expect(code, decoder_class = nil, options = { }) ⇒ Beefcake::Message, :empty
Receives a Riak-formatted message, checks the symbolic name against the given code, decodes it if it matches, and can optionally return success if the payload is empty.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/riak/client/beefcake/protocol.rb', line 101 def expect(code, decoder_class = nil, = { }) code = BeefcakeMessageCodes[code] unless code.is_a? Symbol name, body = receive if name == :ErrorResp raise ProtobuffsErrorResponse.new RpbErrorResp.decode(body) end if name != code raise ProtobuffsUnexpectedResponse.new name, code end return true if decoder_class.nil? return :empty if body.nil? && [:empty_body_acceptable] return decoder_class.decode body end |
#receive ⇒ Array<Symbol, String>
Receives a Riak-formatted message, and returns the symbolic name of the message along with the string payload from the network.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/riak/client/beefcake/protocol.rb', line 67 def receive if read_timeout && !IO.select([socket], nil, nil, read_timeout) raise Errno::ETIMEDOUT, 'read timeout' end header = socket.read 5 raise ProtobuffsFailedHeader.new if header.nil? , code = header.unpack 'NC' body_length = - 1 body = nil body = socket.read body_length if body_length > 0 name = BeefcakeMessageCodes[code] return name, body end |
#write(code, message = nil) ⇒ Object
Encodes and writes a Riak-formatted message, including protocol buffer payload if given.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/riak/client/beefcake/protocol.rb', line 30 def write(code, = nil) if code.is_a? Symbol code = BeefcakeMessageCodes.index code end serialized = serialize header = [serialized.length + 1, code].pack 'NC' payload = header + serialized if write_timeout begin loop do bytes_written = socket.write_nonblock(payload) # write_nonblock doesn't guarantee to write all data at once, # so check if there are bytes left to be written break if bytes_written >= payload.bytesize payload.slice!(0, bytes_written) end rescue IO::WaitWritable, Errno::EINTR # wait with the retry until socket is writable again unless IO.select(nil, [socket], nil, write_timeout) raise Errno::ETIMEDOUT, 'write timeout' end retry end else socket.write(payload) end socket.flush end |