Class: Riakpb::Client::Rpc
- Inherits:
-
Object
- Object
- Riakpb::Client::Rpc
- Includes:
- Util::Decode, Util::Encode, Util::MessageCode, Util::Translation
- Defined in:
- lib/riakpb/client/rpc.rb
Constant Summary collapse
- RECV_LIMIT =
1073741824
Constants included from Util::Decode
Util::Decode::PBMC, Util::Decode::PLEN, Util::Decode::POFF
Constants included from Util::MessageCode
Util::MessageCode::DEL_REQUEST, Util::MessageCode::DEL_RESPONSE, Util::MessageCode::ERROR_RESPONSE, Util::MessageCode::GET_BUCKET_REQUEST, Util::MessageCode::GET_BUCKET_RESPONSE, Util::MessageCode::GET_CLIENT_ID_REQUEST, Util::MessageCode::GET_CLIENT_ID_RESPONSE, Util::MessageCode::GET_REQUEST, Util::MessageCode::GET_RESPONSE, Util::MessageCode::GET_SERVER_INFO_REQUEST, Util::MessageCode::GET_SERVER_INFO_RESPONSE, Util::MessageCode::LIST_BUCKETS_REQUEST, Util::MessageCode::LIST_BUCKETS_RESPONSE, Util::MessageCode::LIST_KEYS_REQUEST, Util::MessageCode::LIST_KEYS_RESPONSE, Util::MessageCode::MAP_REDUCE_REQUEST, Util::MessageCode::MAP_REDUCE_RESPONSE, Util::MessageCode::MC_RESPONSE_FOR, Util::MessageCode::PING_REQUEST, Util::MessageCode::PING_RESPONSE, Util::MessageCode::PUT_REQUEST, Util::MessageCode::PUT_RESPONSE, Util::MessageCode::RESPONSE_CLASS_FOR, Util::MessageCode::SET_BUCKET_REQUEST, Util::MessageCode::SET_BUCKET_RESPONSE, Util::MessageCode::SET_CLIENT_ID_REQUEST, Util::MessageCode::SET_CLIENT_ID_RESPONSE
Instance Attribute Summary collapse
-
#req_message ⇒ Object
readonly
Returns the value of attribute req_message.
-
#resp_message ⇒ Object
readonly
Returns the value of attribute resp_message.
-
#resp_message_codes ⇒ Object
readonly
Returns the value of attribute resp_message_codes.
-
#response ⇒ Object
readonly
Returns the value of attribute response.
-
#socket ⇒ TCPSocket
readonly
The TCPSocket of the remote riak node.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
Instance Method Summary collapse
-
#clear ⇒ Object
Clears the request / response data, in preparation for a new request.
-
#initialize(client, limit = RECV_LIMIT) ⇒ Rpc
constructor
Establishes a Client ID with the Riakpb node, for the life of the RPC connection.
-
#parse_response(value) ⇒ Protobuf::Message
Handles the response from the Riakpb node.
-
#request(mc, pb_msg = nil) ⇒ True/False
Sends the request to the riak node.
-
#set_client_id ⇒ True/False
Sets the Client ID for the TCPSocket session.
-
#with_socket {|TCPSocket| ... } ⇒ TCPSocket
Opens a TCPSocket connection with the riak host/node.
Methods included from Util::Decode
#decode_message, #message_remaining?
Methods included from Util::Encode
#assemble_request, #encode_message
Methods included from Util::Translation
Constructor Details
#initialize(client, limit = RECV_LIMIT) ⇒ Rpc
Establishes a Client ID with the Riakpb node, for the life of the RPC connection.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/riakpb/client/rpc.rb', line 24 def initialize(client, limit=RECV_LIMIT) @status = false @client = client @limit = limit @client_id = request(Util::MessageCode::GET_CLIENT_ID_REQUEST).client_id @set_client_id = Riakpb::RpbSetClientIdReq.new(:client_id => @client_id) # Request / Response Data @resp_message_codes = -1 @resp_message = [] @req_message_code = -1 @req_message = '' @response = '' end |
Instance Attribute Details
#req_message ⇒ Object (readonly)
Returns the value of attribute req_message.
19 20 21 |
# File 'lib/riakpb/client/rpc.rb', line 19 def @req_message end |
#resp_message ⇒ Object (readonly)
Returns the value of attribute resp_message.
19 20 21 |
# File 'lib/riakpb/client/rpc.rb', line 19 def @resp_message end |
#resp_message_codes ⇒ Object (readonly)
Returns the value of attribute resp_message_codes.
19 20 21 |
# File 'lib/riakpb/client/rpc.rb', line 19 def @resp_message_codes end |
#response ⇒ Object (readonly)
Returns the value of attribute response.
19 20 21 |
# File 'lib/riakpb/client/rpc.rb', line 19 def response @response end |
#socket ⇒ TCPSocket (readonly)
Returns The TCPSocket of the remote riak node.
60 61 62 |
# File 'lib/riakpb/client/rpc.rb', line 60 def socket @socket end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
19 20 21 |
# File 'lib/riakpb/client/rpc.rb', line 19 def status @status end |
Instance Method Details
#clear ⇒ Object
Clears the request / response data, in preparation for a new request
40 41 42 43 44 45 46 47 48 |
# File 'lib/riakpb/client/rpc.rb', line 40 def clear @resp_message_codes = -1 @resp_message = [] @req_message_code = -1 @req_message = '' @response = '' @status = false @buffer = '' end |
#parse_response(value) ⇒ Protobuf::Message
Handles the response from the Riakpb node
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/riakpb/client/rpc.rb', line 111 def parse_response(value) @resp_message << value value = @buffer + value response_chunk, @resp_message_codes, @buffer = (value) @resp_message_codes.each do |resp_mc| if resp_mc.equal?(ERROR_RESPONSE) raise FailedRequest.new(MC_RESPONSE_FOR[@req_message_code], @resp_message_codes, response_chunk) end # The below should never really happen if resp_mc != MC_RESPONSE_FOR[@req_message_code] raise FailedExchange.new(MC_RESPONSE_FOR[@req_message_code], @resp_message_codes, response_chunk, "failed_request") end end if response_chunk.size > 0 @response.parse_from_string response_chunk end @status = true return(@response) end |
#request(mc, pb_msg = nil) ⇒ True/False
Sends the request to the riak node
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/riakpb/client/rpc.rb', line 83 def request(mc, pb_msg=nil) clear @req_message_code = mc @response = RESPONSE_CLASS_FOR[mc].new unless RESPONSE_CLASS_FOR[mc].nil? with_socket do |socket| begin begin @req_message = assemble_request mc, pb_msg.serialize_to_string rescue NoMethodError @req_message = assemble_request mc end socket.write(@req_message) self.parse_response socket.sysread(@limit) end while(false == (@response[:done] rescue true)) socket.flush end # with_socket return(@response) end |
#set_client_id ⇒ True/False
Sets the Client ID for the TCPSocket session
67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/riakpb/client/rpc.rb', line 67 def set_client_id @set_c_id_req ||= assemble_request( Util::MessageCode::SET_CLIENT_ID_REQUEST, @set_client_id.serialize_to_string) socket.write(@set_c_id_req) set_c_id_resp = socket.sysread(@limit) resp_code, resp_msg = (set_c_id_resp) return(resp_code == Util::MessageCode::SET_CLIENT_ID_RESPONSE) end |
#with_socket {|TCPSocket| ... } ⇒ TCPSocket
Opens a TCPSocket connection with the riak host/node
53 54 55 56 57 |
# File 'lib/riakpb/client/rpc.rb', line 53 def with_socket set_client_id unless @set_client_id.nil? yield(socket) end |