Class: Riakpb::Client

Inherits:
Object
  • Object
show all
Includes:
Util::MessageCode, Util::Translation
Defined in:
lib/riakpb/client.rb,
lib/riakpb/client/rpc.rb

Overview

A client connection to Riakpb.

Defined Under Namespace

Classes: Rpc

Constant Summary collapse

MAX_CLIENT_ID =

When using integer client IDs, the exclusive upper-bound of valid values.

4294967296
HOST_REGEX =

Regexp for validating hostnames

http://rubular.com/r/N2HOgxFkN3
/^([[:alnum:]]+(-*[[:alnum:]]+)*(\.{0,1}(([[:alnum:]]-*)*[[:alnum:]]+)+)*)+$/

Constants included from Util::MessageCode

Util::MessageCode::DEL_REQUEST, Util::MessageCode::DEL_RESPONSE, Util::MessageCode::ERROR_RESPONSE, Util::MessageCode::GET_BUCKET_REQUEST, Util::MessageCode::GET_BUCKET_RESPONSE, Util::MessageCode::GET_CLIENT_ID_REQUEST, Util::MessageCode::GET_CLIENT_ID_RESPONSE, Util::MessageCode::GET_REQUEST, Util::MessageCode::GET_RESPONSE, Util::MessageCode::GET_SERVER_INFO_REQUEST, Util::MessageCode::GET_SERVER_INFO_RESPONSE, Util::MessageCode::LIST_BUCKETS_REQUEST, Util::MessageCode::LIST_BUCKETS_RESPONSE, Util::MessageCode::LIST_KEYS_REQUEST, Util::MessageCode::LIST_KEYS_RESPONSE, Util::MessageCode::MAP_REDUCE_REQUEST, Util::MessageCode::MAP_REDUCE_RESPONSE, Util::MessageCode::MC_RESPONSE_FOR, Util::MessageCode::PING_REQUEST, Util::MessageCode::PING_RESPONSE, Util::MessageCode::PUT_REQUEST, Util::MessageCode::PUT_RESPONSE, Util::MessageCode::RESPONSE_CLASS_FOR, Util::MessageCode::SET_BUCKET_REQUEST, Util::MessageCode::SET_BUCKET_RESPONSE, Util::MessageCode::SET_CLIENT_ID_REQUEST, Util::MessageCode::SET_CLIENT_ID_RESPONSE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(options = {}) ⇒ Client

Creates a client connection to Riakpb’s Protobuf Listener



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/riakpb/client.rb', line 29

def initialize(options={})
  self.host         = options[:host]      ||      "127.0.0.1"
  self.port         = options[:port]      ||      8087
  self.client_id    = options[:client_id] unless  options[:client_id].blank?

  read_quorum       = options[:r]         || options[:read_quorum]
  write_quorum      = options[:w]         || options[:write_quorum]
  replica_commit    = options[:dw]        || options[:replica_commit]
  return_body       = options[:rb]        || options[:return_body]    || true

  @options                = options.slice!(:host, :port, :client_id, :r, :read_quorum, :w, :write_quorum, :dw, :replica_commit, :rb, :return_body)
  @options[:r]            = read_quorum     unless read_quorum.blank?
  @options[:w]            = write_quorum    unless write_quorum.blank?
  @options[:dw]           = replica_commit  unless replica_commit.blank?
  @options[:return_body]  = return_body     unless return_body.blank?

  @buckets          = []
  @bucket_cache     = Hash.new{|k,v| k[v] = Riakpb::Bucket.new(self, v, @options)}
end

Instance Attribute Details

#bucket_cacheObject (readonly)

Returns the value of attribute bucket_cache.



21
22
23
# File 'lib/riakpb/client.rb', line 21

def bucket_cache
  @bucket_cache
end

#bucketsArray (readonly)

Lists the buckets found in the Riakpb database

Returns:

  • (Array)

    list of buckets (String)

Raises:

  • (ReturnRespError)

    if the message response does not correlate with the message requested



242
243
244
# File 'lib/riakpb/client.rb', line 242

def buckets
  @buckets
end

#client_idObject

Returns the value of attribute client_id.



24
25
26
# File 'lib/riakpb/client.rb', line 24

def client_id
  @client_id
end

#hostObject

Returns the value of attribute host.



18
19
20
# File 'lib/riakpb/client.rb', line 18

def host
  @host
end

#nodeObject (readonly)

Returns the value of attribute node.



22
23
24
# File 'lib/riakpb/client.rb', line 22

def node
  @node
end

#optionsObject (readonly)

Returns the value of attribute options.



25
26
27
# File 'lib/riakpb/client.rb', line 25

def options
  @options
end

#portObject

Returns the value of attribute port.



19
20
21
# File 'lib/riakpb/client.rb', line 19

def port
  @port
end

#server_versionObject (readonly)

Returns the value of attribute server_version.



23
24
25
# File 'lib/riakpb/client.rb', line 23

def server_version
  @server_version
end

Instance Method Details

#bucket(bucket) ⇒ Bucket Also known as: [], bring_me_bucket

I need bucket! Bring me bucket! (Retrieves a bucket from Riakpb. Eating disorder not included.)

Parameters:

  • bucket (String)

    the bucket to retrieve

Returns:

  • (Bucket)

    the requested bucket



110
111
112
113
# File 'lib/riakpb/client.rb', line 110

def bucket(bucket)
  return(@bucket_cache[bucket]) if @bucket_cache.has_key?(bucket)
  self.bucket!(bucket)
end

#bucket!(bucket) ⇒ Bucket Also known as: bring_me_bucket!

I need bucket! Bring me bucket! (Retrieves a bucket from Riakpb, even if it’s already been retrieved.)

Parameters:

  • bucket (String)

    the bucket to retrieve

Returns:

  • (Bucket)

    the requested bucket



120
121
122
123
124
125
126
127
# File 'lib/riakpb/client.rb', line 120

def bucket!(bucket)
  request       = Riakpb::RpbGetBucketReq.new(:bucket => bucket)
  response      = rpc.request(
                    Util::MessageCode::GET_BUCKET_REQUEST,
                    request
                  )
  @bucket_cache[bucket].load(response)
end

#del_request(bucket, key, rw = nil) ⇒ RpbGetResp

Deletes a key, using RpbDelReq, from within a given bucket, from Riakpb.

Parameters:

  • bucket (String)

    the bucket from which to delete the key

  • key (String)

    the name of the key to be deleted

  • rw (Fixnum) (defaults to: nil)

    how many replicas to delete before returning a successful response

Returns:

  • (RpbGetResp)

    the response confirming deletion



205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/riakpb/client.rb', line 205

def del_request(bucket, key, rw=nil)
  request         = Riakpb::RpbDelReq.new
  request.bucket  = bucket
  request.key     = key
  request.rw    ||= rw

  response        = rpc.request(
                      Util::MessageCode::DEL_REQUEST,
                      request
                    )

  return(true)      if response == ""
  return(response)
end

#get_request(bucket, key, quorum = nil) ⇒ RpbGetResp Also known as: req, get

Retrieves a key, using RpbGetReq, from within a given bucket, from Riakpb.

Parameters:

  • bucket (String)

    the bucket from which to retrieve the key

  • key (String)

    the name of the key to be received

  • quorum (Fixnum) (defaults to: nil)

    read quorum- num of replicas need to agree when retrieving the object

Returns:

  • (RpbGetResp)

    the response in which the given Key is stored



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/riakpb/client.rb', line 158

def get_request(bucket, key, quorum=nil)
  request   = Riakpb::RpbGetReq.new({:bucket => bucket, :key => key})

  quorum  ||= @read_quorum
  unless quorum.blank?
    quorum    = quorum.to_i
    request.r = quorum
  end


  response  = rpc.request(
                Util::MessageCode::GET_REQUEST,
                request
              )

  return(response)
end

#infoHash

Retrieves basic information from the riak node.

Returns:

  • (Hash)

    Returns the name of the node and its software release number



98
99
100
101
102
103
104
105
# File 'lib/riakpb/client.rb', line 98

def info
  response        = rpc.request Riakpb::Util::MessageCode::GET_SERVER_INFO_REQUEST

  @node           = response.node
  @server_version = response.server_version

  {:node => @node, :server_version => @server_version}
end

#keys_in(bucket) ⇒ Hash

Lists the keys within their respective buckets, that are found in the Riakpb database

Parameters:

  • bucket (String)

    the bucket from which to retrieve the list of keys

Returns:

  • (Hash)

    Mapping of the buckets (String) to their keys (Array of Strings)

Raises:

  • (ReturnRespError)

    if the message response does not correlate with the message requested



253
254
255
256
257
258
259
# File 'lib/riakpb/client.rb', line 253

def keys_in(bucket)
  list_keys_request = RpbListKeysReq.new(:bucket => bucket)

  response = rpc.request Util::MessageCode::LIST_KEYS_REQUEST, list_keys_request

  return(response.keys.each{|k| k})
end

#map_reduce_request(mr_request, content_type) ⇒ RpbMapRedResp Also known as: mapred, mr

Sends a MapReduce operation to riak, using RpbMapRedReq, and returns the Response/phases.

Parameters:

  • mr_request (String)

    map/reduce job, encoded/stringified

  • content_type (String)

    encoding for map/reduce job

Returns:

  • (RpbMapRedResp)

    the response, encoded in the same format that was sent



224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/riakpb/client.rb', line 224

def map_reduce_request(mr_request, content_type)
  request               = Riakpb::RpbMapRedReq.new
  request.request       = mr_request
  request.content_type  = content_type

  response              = rpc.request(
                            Util::MessageCode::MAP_REDUCE_REQUEST,
                            request
                          )

  return(response)
end

#ping?Boolean

Tests connectivity with the Riakpb host.

Returns:

  • (Boolean)

    Successful returned as ‘true’, failed connection returned as ‘false’



90
91
92
93
94
# File 'lib/riakpb/client.rb', line 90

def ping?
  rpc.request Util::MessageCode::PING_REQUEST

  return rpc.status
end

#put_request(options) ⇒ RpbPutResp

Inserts a key into riak, using RpbPutReq.

Parameters:

  • options (Hash)

    a customizable set of options

Options Hash (options):

  • :w (Fixnum) — default: write quorum

    how many replicas to write to before returning a successful response.

  • Fixnum (Fixnum :dw how many replicas to commit to durable storage before returning a successful response.)

    :dw how many replicas to commit to durable storage before returning a successful response.

  • :return_body (Boolean)

    whether to return the contents of the stored object.

Returns:

  • (RpbPutResp)

    the response confirming Key storage and (optionally) the Key’s updated/new data.

Raises:

  • (ArgumentError)


183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/riakpb/client.rb', line 183

def put_request(options)
  raise ArgumentError, t('invalid_bucket')  if options[:bucket].empty?
  raise ArgumentError, t('empty_content')   if options[:content].nil?
  options[:w]           ||= @write_quorum   unless @write_quorum.nil?
  options[:dw]          ||= @replica_commit unless @replica_commit.nil?
  options[:return_body]   = @return_body    unless options.has_key?(:return_body)

  request   = Riakpb::RpbPutReq.new(options.slice :bucket, :key, :vclock, :content, :w, :dw, :return_body)
  response  = rpc.request(
                Util::MessageCode::PUT_REQUEST,
                request
              )

  return(true)      if response == ""
  return(response)
end

#rpc(options = {}) ⇒ Riakpb::Client::Rpc

Establish a connection to the riak node, and store the Rpc instance

Returns:



83
84
85
86
# File 'lib/riakpb/client.rb', line 83

def rpc(options={})
  options[:client_id] ||= @client_id if @client_id
  @rpc                ||= Rpc.new(self)
end

#set_bucket(bucket, props) ⇒ TrueClass, FalseClass

Set the properties for a given bucket, and then reload it.

Parameters:

  • bucket (String)

    the bucket name in which props will be set

  • props (RpbBucketProps, Hash)

    the properties to be set within the given bucket

Returns:

  • (TrueClass, FalseClass)

    whether or not the operation was successful



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/riakpb/client.rb', line 134

def set_bucket(bucket, props)
  props = Riakpb::RpbBucketProps.new(props) if props.is_a?(Hash)

  raise TypeError.new t('invalid_props') unless props.is_a?(Riakpb::RpbBucketProps)

  begin
    request       = Riakpb::RpbSetBucketReq.new(:bucket => bucket, :props => props)
    response      = rpc.request(
                      Util::MessageCode::SET_BUCKET_REQUEST,
                      request
                    )
    self.bucket!(bucket)

    return(true)
  rescue FailedRequest
    return(false)
  end
end