Class: Riak::Client
- Includes:
- Instrumentable, Util::Escape, Util::Translation
- Defined in:
- lib/riak/client.rb,
lib/riak/client/node.rb,
lib/riak/client/search.rb,
lib/riak/client/decaying.rb,
lib/riak/client/yokozuna.rb,
lib/riak/client/beefcake/socket.rb,
lib/riak/client/instrumentation.rb,
lib/riak/client/beefcake/messages.rb,
lib/riak/client/beefcake/protocol.rb,
lib/riak/client/feature_detection.rb,
lib/riak/client/protobuffs_backend.rb,
lib/riak/client/beefcake/crdt_loader.rb,
lib/riak/client/beefcake/crdt_operator.rb,
lib/riak/client/beefcake/message_codes.rb,
lib/riak/client/beefcake/object_methods.rb,
lib/riak/client/beefcake/message_overlay.rb,
lib/riak/client/beefcake_protobuffs_backend.rb
Overview
A client connection to Riak.
Defined Under Namespace
Modules: BeefcakeMessageCodes, FeatureDetection Classes: BeefcakeProtobuffsBackend, Decaying, Node, ProtobuffsBackend
Constant Summary collapse
- MAX_CLIENT_ID =
When using integer client IDs, the exclusive upper-bound of valid values.
4294967296
- HOST_REGEX =
Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6
/^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n
- VALID_OPTIONS =
Valid constructor options.
[:nodes, :client_id, :protobuffs_backend, :authentication] | Node::VALID_OPTIONS
- NETWORK_ERRORS =
Network errors.
[ EOFError, Errno::ECONNABORTED, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::ENETDOWN, Errno::ENETRESET, Errno::ENETUNREACH, SocketError, SystemCallError, Riak::ProtobuffsFailedHeader, ]
- Pool =
::Innertube::Pool
Instance Attribute Summary collapse
-
#authentication ⇒ Hash
readonly
The authentication information this client will use.
-
#client_id ⇒ String
The internal client ID used by Riak to route responses.
-
#multiget_threads ⇒ Integer
The number of threads for multiget requests.
-
#nodes ⇒ Array
The set of Nodes this client can communicate with.
-
#protobuffs_backend ⇒ Symbol
The Protocol Buffers backend/client to use.
-
#protobuffs_pool ⇒ Client::Pool
readonly
A pool of protobuffs connections.
Instance Method Summary collapse
-
#bucket(name, options = {}) ⇒ Bucket
(also: #[])
Retrieves a bucket from Riak.
- #bucket_type(name) ⇒ Object
-
#buckets(options = {}, &block) ⇒ Array<Bucket>
(also: #list_buckets)
Lists buckets which have keys stored in them.
-
#choose_node(nodes = self.nodes) ⇒ Object
Choose a node from a set.
-
#clear_bucket_props(bucket, options = { }) ⇒ Object
Clears the properties on a bucket.
- #create_search_index(name, schema = nil, n_val = nil, timeout = nil) ⇒ Object
- #create_search_schema(name, content) ⇒ Object
-
#delete_object(bucket, key, options = {}) ⇒ Object
Delete an object.
- #delete_search_index(name) ⇒ Object
-
#get_bucket_props(bucket, options = { }) ⇒ Object
Bucket properties.
-
#get_index(bucket, index, query, options = {}) ⇒ Object
Queries a secondary index on a bucket.
-
#get_many(pairs) ⇒ Object
Get multiple objects in parallel.
-
#get_object(bucket, key, options = {}) ⇒ Object
Get an object.
-
#get_preflist(bucket, key, type = nil, options = { }) ⇒ Array<PreflistItem>
Retrieves a preflist for the given bucket, key, and type; useful for figuring out where in the cluster an object is stored.
- #get_search_index(name) ⇒ Object
- #get_search_schema(name) ⇒ Object
-
#initialize(options = {}) ⇒ Client
constructor
Creates a client connection to Riak.
-
#inspect ⇒ String
A representation suitable for IRB and debugging output.
-
#list_keys(bucket, options = {}, &block) ⇒ Object
Retrieves a list of keys in the given bucket.
- #list_search_indexes ⇒ Object
-
#mapred(mr, &block) ⇒ Object
Executes a mapreduce request.
-
#new_protobuffs_backend ⇒ ProtobuffsBackend
Creates a new protocol buffers backend.
-
#node ⇒ Node
An arbitrary Node.
-
#ping ⇒ true, false
Pings the Riak cluster to check for liveness.
-
#protobuffs(&block) ⇒ Object
(also: #backend)
Yields a protocol buffers backend.
-
#recover_from(pool) ⇒ Object
Takes a pool.
-
#reload_object(object, options = {}) ⇒ Object
Reloads the object from Riak.
-
#search(*args) ⇒ Hash
(also: #select)
(Riak Search) Performs a search via the Solr interface.
-
#security? ⇒ Boolean
Is security enabled?.
-
#set_bucket_props(bucket, properties, type = nil) ⇒ Object
Sets the properties on a bucket.
-
#stamp ⇒ Stamp
Exposes a Stamp object for use in generating unique identifiers.
-
#store_object(object, options = {}) ⇒ Object
Stores an object in Riak.
Methods included from Util::Escape
#escape, #maybe_escape, #maybe_unescape, #unescape
Methods included from Util::Translation
Constructor Details
#initialize(options = {}) ⇒ Client
Creates a client connection to Riak
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/riak/client.rb', line 87 def initialize( = {}) if .include? :port warn(t('deprecated.port', :backtrace => caller[0..2].join("\n "))) end unless (evil = .keys - VALID_OPTIONS).empty? raise ArgumentError, "#{evil.inspect} are not valid options for Client.new" end @nodes = ([:nodes] || []).map do |n| Client::Node.new self, n end if @nodes.empty? or [:host] or [:pb_port] @nodes |= [Client::Node.new(self, )] end @protobuffs_pool = Pool.new( method(:new_protobuffs_backend), lambda { |b| b.teardown } ) self.protobuffs_backend = [:protobuffs_backend] || :Beefcake self.client_id = [:client_id] if [:client_id] self.multiget_threads = [:multiget_threads] @authentication = [:authentication] && [:authentication].symbolize_keys end |
Instance Attribute Details
#authentication ⇒ Hash (readonly)
Returns The authentication information this client will use.
73 74 75 |
# File 'lib/riak/client.rb', line 73 def authentication @authentication end |
#client_id ⇒ String
Returns The internal client ID used by Riak to route responses.
61 62 63 |
# File 'lib/riak/client.rb', line 61 def client_id @client_id end |
#multiget_threads ⇒ Integer
Returns The number of threads for multiget requests.
70 71 72 |
# File 'lib/riak/client.rb', line 70 def multiget_threads @multiget_threads end |
#nodes ⇒ Array
Returns The set of Nodes this client can communicate with.
58 59 60 |
# File 'lib/riak/client.rb', line 58 def nodes @nodes end |
#protobuffs_backend ⇒ Symbol
Returns The Protocol Buffers backend/client to use.
64 65 66 |
# File 'lib/riak/client.rb', line 64 def protobuffs_backend @protobuffs_backend end |
#protobuffs_pool ⇒ Client::Pool (readonly)
Returns A pool of protobuffs connections.
67 68 69 |
# File 'lib/riak/client.rb', line 67 def protobuffs_pool @protobuffs_pool end |
Instance Method Details
#bucket(name, options = {}) ⇒ Bucket Also known as: []
Retrieves a bucket from Riak.
126 127 128 129 130 131 132 133 134 135 |
# File 'lib/riak/client.rb', line 126 def bucket(name, = {}) raise ArgumentError, t('zero_length_bucket') if name == '' unless (.keys - [:props]).empty? raise ArgumentError, "invalid options" end @bucket_cache ||= {} (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b| b.props if [:props] end end |
#bucket_type(name) ⇒ Object
138 139 140 |
# File 'lib/riak/client.rb', line 138 def bucket_type(name) BucketType.new self, name end |
#buckets(options = {}, &block) ⇒ Array<Bucket> Also known as: list_buckets
This is an expensive operation and should be used only in development.
Lists buckets which have keys stored in them.
146 147 148 149 150 151 152 153 154 |
# File 'lib/riak/client.rb', line 146 def buckets( = {}, &block) warn(t('list_buckets', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings return ListBuckets.new self, , block if block_given? backend do |b| b.list_buckets().map {|name| Bucket.new(self, name) } end end |
#choose_node(nodes = self.nodes) ⇒ Object
Choose a node from a set.
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/riak/client.rb', line 158 def choose_node(nodes = self.nodes) # Prefer nodes which have gone a reasonable time without errors. s = nodes.select do |node| node.error_rate.value < 0.1 end if s.empty? # Fall back to minimally broken node. nodes.min_by do |node| node.error_rate.value end else s[rand(s.size)] end end |
#clear_bucket_props(bucket, options = { }) ⇒ Object
Clears the properties on a bucket. See Bucket#clear_props
393 394 395 396 397 |
# File 'lib/riak/client.rb', line 393 def clear_bucket_props(bucket, = { }) backend do |b| b.reset_bucket_props(bucket, ) end end |
#create_search_index(name, schema = nil, n_val = nil, timeout = nil) ⇒ Object
3 4 5 6 7 8 9 |
# File 'lib/riak/client/yokozuna.rb', line 3 def create_search_index(name, schema = nil, n_val = nil, timeout = nil) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? backend do |b| b.create_search_index(name, schema, n_val, timeout) end true end |
#create_search_schema(name, content) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/riak/client/yokozuna.rb', line 36 def create_search_schema(name, content) raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty? raise ArgumentError, t("zero_length_content") if content.nil? || content.empty? backend do |b| b.create_search_schema(name, content) end true end |
#delete_object(bucket, key, options = {}) ⇒ Object
Delete an object. See Bucket#delete
223 224 225 226 227 |
# File 'lib/riak/client.rb', line 223 def delete_object(bucket, key, = {}) backend do |b| b.delete_object(bucket, key, ) end end |
#delete_search_index(name) ⇒ Object
28 29 30 31 32 33 34 |
# File 'lib/riak/client/yokozuna.rb', line 28 def delete_search_index(name) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? backend do |b| b.delete_search_index(name) end true end |
#get_bucket_props(bucket, options = { }) ⇒ Object
Bucket properties. See Bucket#props
230 231 232 233 234 |
# File 'lib/riak/client.rb', line 230 def get_bucket_props(bucket, = { }) backend do |b| b.get_bucket_props bucket, end end |
#get_index(bucket, index, query, options = {}) ⇒ Object
Queries a secondary index on a bucket. See Bucket#get_index
237 238 239 240 241 |
# File 'lib/riak/client.rb', line 237 def get_index(bucket, index, query, = {}) backend do |b| b.get_index bucket, index, query, end end |
#get_many(pairs) ⇒ Object
Get multiple objects in parallel.
257 258 259 |
# File 'lib/riak/client.rb', line 257 def get_many(pairs) Multiget.get_all self, pairs end |
#get_object(bucket, key, options = {}) ⇒ Object
Get an object. See Bucket#get
262 263 264 265 266 267 |
# File 'lib/riak/client.rb', line 262 def get_object(bucket, key, = {}) raise ArgumentError, t("zero_length_key") if key == '' backend do |b| b.fetch_object(bucket, key, ) end end |
#get_preflist(bucket, key, type = nil, options = { }) ⇒ Array<PreflistItem>
Retrieves a preflist for the given bucket, key, and type; useful for figuring out where in the cluster an object is stored.
250 251 252 253 254 |
# File 'lib/riak/client.rb', line 250 def get_preflist(bucket, key, type = nil, = { }) backend do |b| b.get_preflist bucket, key, type, end end |
#get_search_index(name) ⇒ Object
11 12 13 14 15 16 17 18 |
# File 'lib/riak/client/yokozuna.rb', line 11 def get_search_index(name) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? resp = [] backend do |b| resp = b.get_search_index(name) end resp.index && Array === resp.index ? resp.index.first : resp end |
#get_search_schema(name) ⇒ Object
45 46 47 48 49 50 |
# File 'lib/riak/client/yokozuna.rb', line 45 def get_search_schema(name) raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty? backend do |b| return b.get_search_schema(name) end end |
#inspect ⇒ String
Returns A representation suitable for IRB and debugging output.
270 271 272 |
# File 'lib/riak/client.rb', line 270 def inspect "#<Riak::Client #{nodes.inspect}>" end |
#list_keys(bucket, options = {}, &block) ⇒ Object
Retrieves a list of keys in the given bucket. See Bucket#keys
275 276 277 278 279 280 281 282 283 284 285 |
# File 'lib/riak/client.rb', line 275 def list_keys(bucket, = {}, &block) if block_given? backend do |b| b.list_keys bucket, , &block end else backend do |b| b.list_keys bucket, end end end |
#list_search_indexes ⇒ Object
20 21 22 23 24 25 26 |
# File 'lib/riak/client/yokozuna.rb', line 20 def list_search_indexes() resp = [] backend do |b| resp = b.get_search_index(nil) end resp.index ? resp.index : resp end |
#mapred(mr, &block) ⇒ Object
Executes a mapreduce request. See MapReduce#run
288 289 290 291 292 |
# File 'lib/riak/client.rb', line 288 def mapred(mr, &block) backend do |b| b.mapred(mr, &block) end end |
#new_protobuffs_backend ⇒ ProtobuffsBackend
Creates a new protocol buffers backend.
297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/riak/client.rb', line 297 def new_protobuffs_backend klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend") if klass.configured? node = choose_node( @nodes.select do |n| n.protobuffs? end ) klass.new(self, node) else raise BackendCreationError.new @protobuffs_backend end end |
#node ⇒ Node
Returns An arbitrary Node.
313 314 315 |
# File 'lib/riak/client.rb', line 313 def node nodes[rand nodes.size] end |
#ping ⇒ true, false
Pings the Riak cluster to check for liveness.
319 320 321 322 323 |
# File 'lib/riak/client.rb', line 319 def ping backend do |b| b.ping end end |
#protobuffs(&block) ⇒ Object Also known as: backend
Yields a protocol buffers backend.
326 327 328 |
# File 'lib/riak/client.rb', line 326 def protobuffs(&block) recover_from @protobuffs_pool, &block end |
#recover_from(pool) ⇒ Object
Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
# File 'lib/riak/client.rb', line 341 def recover_from(pool) skip_nodes = [] take_opts = {} tries = 3 begin # Only select nodes which we haven't used before. unless skip_nodes.empty? take_opts[:filter] = lambda do |backend| not skip_nodes.include? backend.node end end # Acquire a backend pool.take(take_opts) do |backend| begin yield backend rescue *NETWORK_ERRORS => e # Network error. tries -= 1 # Notify the node that a request against it failed. backend.node.error_rate << 1 # Skip this node next time. skip_nodes << backend.node # And delete this connection. raise Pool::BadResource, e end end rescue Pool::BadResource => e retry if tries > 0 raise e. end end |
#reload_object(object, options = {}) ⇒ Object
Reloads the object from Riak.
379 380 381 382 383 |
# File 'lib/riak/client.rb', line 379 def reload_object(object, = {}) backend do |b| b.reload_object(object, ) end end |
#search(index, query, options = {}) ⇒ Hash #search(query, options = {}) ⇒ Hash Also known as: select
(Riak Search) Performs a search via the Solr interface.
18 19 20 21 22 23 24 |
# File 'lib/riak/client/search.rb', line 18 def search(*args) = args. index, query = args[-2], args[-1] # Allows nil index, while keeping it as firstargument backend do |b| b.search(index, query, ) end end |
#security? ⇒ Boolean
Is security enabled?
117 118 119 |
# File 'lib/riak/client.rb', line 117 def security? !!authentication end |
#set_bucket_props(bucket, properties, type = nil) ⇒ Object
Sets the properties on a bucket. See Bucket#props=
386 387 388 389 390 |
# File 'lib/riak/client.rb', line 386 def set_bucket_props(bucket, properties, type = nil) backend do |b| b.set_bucket_props(bucket, properties, type) end end |
#stamp ⇒ Stamp
Exposes a Stamp object for use in generating unique identifiers.
403 404 405 |
# File 'lib/riak/client.rb', line 403 def stamp @stamp ||= Riak::Stamp.new(self) end |
#store_object(object, options = {}) ⇒ Object
Stores an object in Riak.
409 410 411 412 413 414 |
# File 'lib/riak/client.rb', line 409 def store_object(object, = {}) params = {:returnbody => true}.merge() backend do |b| b.store_object(object, params) end end |