Class: Riak::Client::BeefcakeProtobuffsBackend::Protocol
- Includes:
- Util::Translation
- Defined in:
- lib/riak/client/beefcake/protocol.rb
Instance Attribute Summary collapse
-
#read_timeout ⇒ Object
readonly
Returns the value of attribute read_timeout.
-
#socket ⇒ Object
readonly
Returns the value of attribute socket.
-
#write_timeout ⇒ Object
readonly
Returns the value of attribute write_timeout.
Instance Method Summary collapse
-
#expect(code, decoder_class = nil, options = { }) ⇒ Beefcake::Message, :empty
Receives a Riak-formatted message, checks the symbolic name against the given code, decodes it if it matches, and can optionally return success if the payload is empty.
-
#initialize(socket, options = {}) ⇒ Protocol
constructor
A new instance of Protocol.
-
#receive ⇒ Array<Symbol, String>
Receives a Riak-formatted message, and returns the symbolic name of the message along with the string payload from the network.
-
#write(code, message = nil) ⇒ Object
Encodes and writes a Riak-formatted message, including protocol buffer payload if given.
Methods included from Util::Translation
Constructor Details
#initialize(socket, options = {}) ⇒ Protocol
Returns a new instance of Protocol.
31 32 33 34 35 |
# File 'lib/riak/client/beefcake/protocol.rb', line 31 def initialize(socket, = {}) @socket = socket @read_timeout = [:read_timeout] @write_timeout = [:write_timeout] end |
Instance Attribute Details
#read_timeout ⇒ Object (readonly)
Returns the value of attribute read_timeout.
25 26 27 |
# File 'lib/riak/client/beefcake/protocol.rb', line 25 def read_timeout @read_timeout end |
#socket ⇒ Object (readonly)
Returns the value of attribute socket.
25 26 27 |
# File 'lib/riak/client/beefcake/protocol.rb', line 25 def socket @socket end |
#write_timeout ⇒ Object (readonly)
Returns the value of attribute write_timeout.
25 26 27 |
# File 'lib/riak/client/beefcake/protocol.rb', line 25 def write_timeout @write_timeout end |
Instance Method Details
#expect(code, decoder_class = nil, options = { }) ⇒ Beefcake::Message, :empty
Receives a Riak-formatted message, checks the symbolic name against the given code, decodes it if it matches, and can optionally return success if the payload is empty.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/riak/client/beefcake/protocol.rb', line 115 def expect(code, decoder_class = nil, = { }) code = BeefcakeMessageCodes[code] unless code.is_a? Symbol name, body = receive if name == :ErrorResp raise ProtobuffsErrorResponse.new RpbErrorResp.decode(body) end if name != code raise ProtobuffsUnexpectedResponse.new name, code end return true if decoder_class.nil? return :empty if body.nil? && [:empty_body_acceptable] return decoder_class.decode body end |
#receive ⇒ Array<Symbol, String>
Receives a Riak-formatted message, and returns the symbolic name of the message along with the string payload from the network.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/riak/client/beefcake/protocol.rb', line 81 def receive if read_timeout && !IO.select([socket], nil, nil, read_timeout) raise Errno::ETIMEDOUT, 'read timeout' end header = socket.read 5 raise ProtobuffsFailedHeader.new if header.nil? , code = header.unpack 'NC' body_length = - 1 body = nil body = socket.read body_length if body_length > 0 name = BeefcakeMessageCodes[code] return name, body end |
#write(code, message = nil) ⇒ Object
Encodes and writes a Riak-formatted message, including protocol buffer payload if given.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/riak/client/beefcake/protocol.rb', line 44 def write(code, = nil) if code.is_a? Symbol code = BeefcakeMessageCodes.index code end serialized = serialize header = [serialized.length + 1, code].pack 'NC' payload = header + serialized if write_timeout begin loop do bytes_written = socket.write_nonblock(payload) # write_nonblock doesn't guarantee to write all data at once, # so check if there are bytes left to be written break if bytes_written >= payload.bytesize payload.slice!(0, bytes_written) end rescue IO::WaitWritable, Errno::EINTR # wait with the retry until socket is writable again unless IO.select(nil, [socket], nil, write_timeout) raise Errno::ETIMEDOUT, 'write timeout' end retry end else socket.write(payload) end socket.flush end |