Class: Riakpb::Client::Rpc

Inherits:
Object
  • Object
show all
Includes:
Util::Decode, Util::Encode, Util::MessageCode, Util::Translation
Defined in:
lib/riakpb/client/rpc.rb

Constant Summary collapse

RECV_LIMIT =
1073741824

Constants included from Util::Decode

Util::Decode::PBMC, Util::Decode::PLEN, Util::Decode::POFF

Constants included from Util::MessageCode

Util::MessageCode::DEL_REQUEST, Util::MessageCode::DEL_RESPONSE, Util::MessageCode::ERROR_RESPONSE, Util::MessageCode::GET_BUCKET_REQUEST, Util::MessageCode::GET_BUCKET_RESPONSE, Util::MessageCode::GET_CLIENT_ID_REQUEST, Util::MessageCode::GET_CLIENT_ID_RESPONSE, Util::MessageCode::GET_REQUEST, Util::MessageCode::GET_RESPONSE, Util::MessageCode::GET_SERVER_INFO_REQUEST, Util::MessageCode::GET_SERVER_INFO_RESPONSE, Util::MessageCode::LIST_BUCKETS_REQUEST, Util::MessageCode::LIST_BUCKETS_RESPONSE, Util::MessageCode::LIST_KEYS_REQUEST, Util::MessageCode::LIST_KEYS_RESPONSE, Util::MessageCode::MAP_REDUCE_REQUEST, Util::MessageCode::MAP_REDUCE_RESPONSE, Util::MessageCode::MC_RESPONSE_FOR, Util::MessageCode::PING_REQUEST, Util::MessageCode::PING_RESPONSE, Util::MessageCode::PUT_REQUEST, Util::MessageCode::PUT_RESPONSE, Util::MessageCode::RESPONSE_CLASS_FOR, Util::MessageCode::SET_BUCKET_REQUEST, Util::MessageCode::SET_BUCKET_RESPONSE, Util::MessageCode::SET_CLIENT_ID_REQUEST, Util::MessageCode::SET_CLIENT_ID_RESPONSE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Decode

#decode_message, #message_remaining?

Methods included from Util::Encode

#assemble_request, #encode_message

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client, limit = RECV_LIMIT) ⇒ Rpc

Establishes a Client ID with the Riakpb node, for the life of the RPC connection.

Parameters:

  • client (Client)

    the Riakpb::Client object in which this Rpc instance lives

  • limit (Fixnum) (defaults to: RECV_LIMIT)

    the max size of an individual TCPSocket receive call. Need to fix, later.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/riakpb/client/rpc.rb', line 24

def initialize(client, limit=RECV_LIMIT)
  @status             = false
  @client             = client
  @limit              = limit
  @client_id          = request(Util::MessageCode::GET_CLIENT_ID_REQUEST).client_id
  @set_client_id      = Riakpb::RpbSetClientIdReq.new(:client_id => @client_id)

  # Request / Response Data
  @resp_message_codes = -1
  @resp_message       = []
  @req_message_code   = -1
  @req_message        = ''
  @response           = ''
end

Instance Attribute Details

#req_messageObject (readonly)

Returns the value of attribute req_message.



19
20
21
# File 'lib/riakpb/client/rpc.rb', line 19

def req_message
  @req_message
end

#resp_messageObject (readonly)

Returns the value of attribute resp_message.



19
20
21
# File 'lib/riakpb/client/rpc.rb', line 19

def resp_message
  @resp_message
end

#resp_message_codesObject (readonly)

Returns the value of attribute resp_message_codes.



19
20
21
# File 'lib/riakpb/client/rpc.rb', line 19

def resp_message_codes
  @resp_message_codes
end

#responseObject (readonly)

Returns the value of attribute response.



19
20
21
# File 'lib/riakpb/client/rpc.rb', line 19

def response
  @response
end

#socketTCPSocket (readonly)

Returns The TCPSocket of the remote riak node.

Returns:

  • (TCPSocket)

    The TCPSocket of the remote riak node



60
61
62
# File 'lib/riakpb/client/rpc.rb', line 60

def socket
  @socket
end

#statusObject (readonly)

Returns the value of attribute status.



19
20
21
# File 'lib/riakpb/client/rpc.rb', line 19

def status
  @status
end

Instance Method Details

#clearObject

Clears the request / response data, in preparation for a new request



40
41
42
43
44
45
46
47
48
# File 'lib/riakpb/client/rpc.rb', line 40

def clear
  @resp_message_codes = -1
  @resp_message       = []
  @req_message_code   = -1
  @req_message        = ''
  @response           = ''
  @status             = false
  @buffer             = ''
end

#parse_response(value) ⇒ Protobuf::Message

Handles the response from the Riakpb node

Parameters:

  • value (String)

    The message returned from the Riakpb node over the TCP Socket

Returns:

  • (Protobuf::Message)

    @response the processed response (if any) from the Riakpb node



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/riakpb/client/rpc.rb', line 111

def parse_response(value)
  @resp_message << value

  value = @buffer + value

  response_chunk, @resp_message_codes, @buffer = decode_message(value)

  @resp_message_codes.each do |resp_mc|
    if resp_mc.equal?(ERROR_RESPONSE)
      raise FailedRequest.new(MC_RESPONSE_FOR[@req_message_code], @resp_message_codes, response_chunk)
    end

    # The below should never really happen
    if resp_mc != MC_RESPONSE_FOR[@req_message_code]
      raise FailedExchange.new(MC_RESPONSE_FOR[@req_message_code], @resp_message_codes, response_chunk, "failed_request")
    end
  end

  if response_chunk.size > 0
    @response.parse_from_string response_chunk
  end

  @status = true
  return(@response)
end

#request(mc, pb_msg = nil) ⇒ True/False

Sends the request to the riak node

Parameters:

  • mc (Fixnum)

    The message code that identifies the request

  • pb_msg (Protobuf::Message) (defaults to: nil)

    The protobuf message, if applicable, for the message code

Returns:

  • (True/False)

    whether or not the set client id request succeeded



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/riakpb/client/rpc.rb', line 83

def request(mc, pb_msg=nil)
  clear

  @req_message_code = mc
  @response         = RESPONSE_CLASS_FOR[mc].new unless RESPONSE_CLASS_FOR[mc].nil?

  with_socket do |socket|
    begin
      begin
        @req_message  = assemble_request mc, pb_msg.serialize_to_string
      rescue NoMethodError
        @req_message  = assemble_request mc
      end

      socket.write(@req_message)
      self.parse_response socket.sysread(@limit)

    end while(false == (@response[:done] rescue true))

    socket.flush
  end # with_socket

  return(@response)
end

#set_client_idTrue/False

Sets the Client ID for the TCPSocket session

Parameters:

  • socket (TCPSocket)

    connection for which the Client ID will be set

Returns:

  • (True/False)

    whether or not the set client id request succeeded



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/riakpb/client/rpc.rb', line 67

def set_client_id
  @set_c_id_req     ||= assemble_request( Util::MessageCode::SET_CLIENT_ID_REQUEST,
                                          @set_client_id.serialize_to_string)

  socket.write(@set_c_id_req)
  set_c_id_resp = socket.sysread(@limit)

  resp_code, resp_msg = decode_message(set_c_id_resp)

  return(resp_code == Util::MessageCode::SET_CLIENT_ID_RESPONSE)
end

#with_socket {|TCPSocket| ... } ⇒ TCPSocket

Opens a TCPSocket connection with the riak host/node

Yields:

  • (TCPSocket)

    hands off the socket connection

Returns:

  • (TCPSocket)

    data that was exchanged with the host/node



53
54
55
56
57
# File 'lib/riakpb/client/rpc.rb', line 53

def with_socket
  set_client_id unless @set_client_id.nil?

  yield(socket)
end