Class: Riak::Client
- Includes:
- Instrumentable, Util::Escape, Util::Translation
- Defined in:
- lib/riak/client.rb,
lib/riak/client/node.rb,
lib/riak/client/search.rb,
lib/riak/client/decaying.rb,
lib/riak/client/yokozuna.rb,
lib/riak/client/beefcake/socket.rb,
lib/riak/client/instrumentation.rb,
lib/riak/client/beefcake/messages.rb,
lib/riak/client/beefcake/protocol.rb,
lib/riak/client/feature_detection.rb,
lib/riak/client/protobuffs_backend.rb,
lib/riak/client/beefcake/crdt_loader.rb,
lib/riak/client/beefcake/crdt_operator.rb,
lib/riak/client/beefcake/message_codes.rb,
lib/riak/client/beefcake/object_methods.rb,
lib/riak/client/beefcake/message_overlay.rb,
lib/riak/client/beefcake_protobuffs_backend.rb
Overview
Copyright 2010-present Basho Technologies, Inc.
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Defined Under Namespace
Modules: BeefcakeMessageCodes, FeatureDetection Classes: BeefcakeProtobuffsBackend, Decaying, Node, ProtobuffsBackend
Constant Summary collapse
- MAX_CLIENT_ID =
When using integer client IDs, the exclusive upper-bound of valid values.
4294967296
- HOST_REGEX =
Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6
/^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n
- VALID_OPTIONS =
Valid constructor options.
[:nodes, :client_id, :protobuffs_backend, :authentication, :max_retries, :connect_timeout, :read_timeout, :write_timeout, :convert_timestamp] | Node::VALID_OPTIONS
- NETWORK_ERRORS =
Network errors.
[ EOFError, Errno::ECONNABORTED, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::ENETDOWN, Errno::ENETRESET, Errno::ENETUNREACH, Errno::ETIMEDOUT, SocketError, SystemCallError, Riak::ProtobuffsFailedHeader, ]
- Pool =
::Innertube::Pool
Instance Attribute Summary collapse
-
#authentication ⇒ Hash
readonly
The authentication information this client will use.
-
#client_id ⇒ String
The internal client ID used by Riak to route responses.
-
#connect_timeout ⇒ Numeric
readonly
The connect timeout, in seconds.
-
#convert_timestamp ⇒ Boolean
readonly
Convert timestamps from Riak TS to Time objects.
-
#max_retries ⇒ Integer
The maximum number of retries in case of NETWORK_ERRORS.
-
#multi_threads ⇒ Integer
(also: #multiget_threads)
The number of threads for multiget requests.
-
#nodes ⇒ Array
The set of Nodes this client can communicate with.
-
#protobuffs_backend ⇒ Symbol
The Protocol Buffers backend/client to use.
-
#protobuffs_pool ⇒ Client::Pool
readonly
A pool of protobuffs connections.
-
#read_timeout ⇒ Numeric
readonly
The read timeout, in seconds.
-
#write_timeout ⇒ Numeric
readonly
The write timeout, in seconds.
Instance Method Summary collapse
-
#bucket(name, options = {}) ⇒ Bucket
(also: #[])
Retrieves a bucket from Riak.
- #bucket_type(name) ⇒ Object
-
#buckets(options = {}, &block) ⇒ Array<Bucket>
(also: #list_buckets)
Lists buckets which have keys stored in them.
-
#choose_node(nodes = self.nodes) ⇒ Object
Choose a node from a set.
-
#clear_bucket_props(bucket, options = { }) ⇒ Object
Clears the properties on a bucket.
- #create_search_index(name, schema = nil, n_val = nil, timeout = nil) ⇒ Object
- #create_search_schema(name, content) ⇒ Object
-
#delete_object(bucket, key, options = {}) ⇒ Object
Delete an object.
- #delete_search_index(name) ⇒ Object
-
#get_bucket_props(bucket, options = { }) ⇒ Object
Bucket properties.
-
#get_index(bucket, index, query, options = {}) ⇒ Object
Queries a secondary index on a bucket.
-
#get_many(pairs) ⇒ Object
Get multiple objects in parallel.
-
#get_object(bucket, key, options = {}) ⇒ Object
Get an object.
-
#get_preflist(bucket, key, type = nil, options = { }) ⇒ Array<PreflistItem>
Retrieves a preflist for the given bucket, key, and type; useful for figuring out where in the cluster an object is stored.
- #get_search_index(name) ⇒ Object
- #get_search_schema(name) ⇒ Object
-
#initialize(options = {}) ⇒ Client
constructor
Creates a client connection to Riak.
-
#inspect ⇒ String
A representation suitable for IRB and debugging output.
-
#list_keys(bucket, options = {}, &block) ⇒ Object
Retrieves a list of keys in the given bucket.
- #list_search_indexes ⇒ Object
-
#mapred(mr, &block) ⇒ Object
Executes a mapreduce request.
-
#new_protobuffs_backend ⇒ ProtobuffsBackend
Creates a new protocol buffers backend.
-
#node ⇒ Node
An arbitrary Node.
-
#ping ⇒ true, false
Pings the Riak cluster to check for liveness.
-
#protobuffs(&block) ⇒ Object
(also: #backend)
Yields a protocol buffers backend.
-
#recover_from(pool) ⇒ Object
Takes a pool.
-
#reload_object(object, options = {}) ⇒ Object
Reloads the object from Riak.
-
#search(*args) ⇒ Hash
(also: #select)
(Riak Search) Performs a search via the Solr interface.
-
#security? ⇒ Boolean
Is security enabled?.
-
#set_bucket_props(bucket, properties, type = nil) ⇒ Object
Sets the properties on a bucket.
-
#stamp ⇒ Stamp
Exposes a Stamp object for use in generating unique identifiers.
-
#store_object(object, options = {}) ⇒ Object
Stores an object in Riak.
Methods included from Util::Escape
#escape, #maybe_escape, #maybe_unescape, #unescape
Methods included from Util::Translation
Constructor Details
#initialize(options = {}) ⇒ Client
Creates a client connection to Riak
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/riak/client.rb', line 127 def initialize( = {}) if .include? :port warn(t('deprecated.port', :backtrace => caller[0..2].join("\n "))) end unless (evil = .keys - VALID_OPTIONS).empty? raise ArgumentError, "#{evil.inspect} are not valid options for Client.new" end @nodes = build_nodes() @protobuffs_pool = Pool.new( method(:new_protobuffs_backend), lambda { |b| b.teardown } ) self.protobuffs_backend = [:protobuffs_backend] || :Beefcake self.client_id = [:client_id] if [:client_id] self.multi_threads = [:multi_threads] || [:multiget_threads] @authentication = [:authentication] && [:authentication].symbolize_keys self.max_retries = [:max_retries] || 2 @connect_timeout = [:connect_timeout] @read_timeout = [:read_timeout] @write_timeout = [:write_timeout] @convert_timestamp = [:convert_timestamp] || false end |
Instance Attribute Details
#authentication ⇒ Hash (readonly)
Returns The authentication information this client will use.
94 95 96 |
# File 'lib/riak/client.rb', line 94 def authentication @authentication end |
#client_id ⇒ String
Returns The internal client ID used by Riak to route responses.
79 80 81 |
# File 'lib/riak/client.rb', line 79 def client_id @client_id end |
#connect_timeout ⇒ Numeric (readonly)
Returns The connect timeout, in seconds.
100 101 102 |
# File 'lib/riak/client.rb', line 100 def connect_timeout @connect_timeout end |
#convert_timestamp ⇒ Boolean (readonly)
Returns Convert timestamps from Riak TS to Time objects.
109 110 111 |
# File 'lib/riak/client.rb', line 109 def @convert_timestamp end |
#max_retries ⇒ Integer
Returns The maximum number of retries in case of NETWORK_ERRORS.
97 98 99 |
# File 'lib/riak/client.rb', line 97 def max_retries @max_retries end |
#multi_threads ⇒ Integer Also known as: multiget_threads
Returns The number of threads for multiget requests.
88 89 90 |
# File 'lib/riak/client.rb', line 88 def multi_threads @multi_threads end |
#nodes ⇒ Array
Returns The set of Nodes this client can communicate with.
76 77 78 |
# File 'lib/riak/client.rb', line 76 def nodes @nodes end |
#protobuffs_backend ⇒ Symbol
Returns The Protocol Buffers backend/client to use.
82 83 84 |
# File 'lib/riak/client.rb', line 82 def protobuffs_backend @protobuffs_backend end |
#protobuffs_pool ⇒ Client::Pool (readonly)
Returns A pool of protobuffs connections.
85 86 87 |
# File 'lib/riak/client.rb', line 85 def protobuffs_pool @protobuffs_pool end |
#read_timeout ⇒ Numeric (readonly)
Returns The read timeout, in seconds.
103 104 105 |
# File 'lib/riak/client.rb', line 103 def read_timeout @read_timeout end |
#write_timeout ⇒ Numeric (readonly)
Returns The write timeout, in seconds.
106 107 108 |
# File 'lib/riak/client.rb', line 106 def write_timeout @write_timeout end |
Instance Method Details
#bucket(name, options = {}) ⇒ Bucket Also known as: []
Retrieves a bucket from Riak.
165 166 167 168 169 170 171 172 173 174 |
# File 'lib/riak/client.rb', line 165 def bucket(name, = {}) raise ArgumentError, t('zero_length_bucket') if name == '' unless (.keys - [:props]).empty? raise ArgumentError, "invalid options" end @bucket_cache ||= {} (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b| b.props if [:props] end end |
#bucket_type(name) ⇒ Object
177 178 179 |
# File 'lib/riak/client.rb', line 177 def bucket_type(name) BucketType.new self, name end |
#buckets(options = {}, &block) ⇒ Array<Bucket> Also known as: list_buckets
This is an expensive operation and should be used only in development.
Lists buckets which have keys stored in them.
185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/riak/client.rb', line 185 def buckets( = {}, &block) unless Riak.disable_list_exceptions msg = warn(t('list_buckets', :backtrace => caller.join("\n "))) raise Riak::ListError.new(msg) end return ListBuckets.new self, , block if block_given? backend do |b| b.list_buckets().map {|name| Bucket.new(self, name) } end end |
#choose_node(nodes = self.nodes) ⇒ Object
Choose a node from a set.
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/riak/client.rb', line 200 def choose_node(nodes = self.nodes) # Prefer nodes which have gone a reasonable time without errors. s = nodes.select do |node| node.error_rate.value < 0.1 end if s.empty? # Fall back to minimally broken node. nodes.min_by do |node| node.error_rate.value end else s[rand(s.size)] end end |
#clear_bucket_props(bucket, options = { }) ⇒ Object
Clears the properties on a bucket. See Bucket#clear_props
440 441 442 443 444 |
# File 'lib/riak/client.rb', line 440 def clear_bucket_props(bucket, = { }) backend do |b| b.reset_bucket_props(bucket, ) end end |
#create_search_index(name, schema = nil, n_val = nil, timeout = nil) ⇒ Object
17 18 19 20 21 22 23 |
# File 'lib/riak/client/yokozuna.rb', line 17 def create_search_index(name, schema = nil, n_val = nil, timeout = nil) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? backend do |b| b.create_search_index(name, schema, n_val, timeout) end true end |
#create_search_schema(name, content) ⇒ Object
50 51 52 53 54 55 56 57 |
# File 'lib/riak/client/yokozuna.rb', line 50 def create_search_schema(name, content) raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty? raise ArgumentError, t("zero_length_content") if content.nil? || content.empty? backend do |b| b.create_search_schema(name, content) end true end |
#delete_object(bucket, key, options = {}) ⇒ Object
Delete an object. See Bucket#delete
268 269 270 271 272 |
# File 'lib/riak/client.rb', line 268 def delete_object(bucket, key, = {}) backend do |b| b.delete_object(bucket, key, ) end end |
#delete_search_index(name) ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/riak/client/yokozuna.rb', line 42 def delete_search_index(name) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? backend do |b| b.delete_search_index(name) end true end |
#get_bucket_props(bucket, options = { }) ⇒ Object
Bucket properties. See Bucket#props
275 276 277 278 279 |
# File 'lib/riak/client.rb', line 275 def get_bucket_props(bucket, = { }) backend do |b| b.get_bucket_props bucket, end end |
#get_index(bucket, index, query, options = {}) ⇒ Object
Queries a secondary index on a bucket. See Bucket#get_index
282 283 284 285 286 |
# File 'lib/riak/client.rb', line 282 def get_index(bucket, index, query, = {}) backend do |b| b.get_index bucket, index, query, end end |
#get_many(pairs) ⇒ Object
Get multiple objects in parallel.
302 303 304 |
# File 'lib/riak/client.rb', line 302 def get_many(pairs) Multiget.perform self, pairs end |
#get_object(bucket, key, options = {}) ⇒ Object
Get an object. See Bucket#get
307 308 309 310 311 312 313 |
# File 'lib/riak/client.rb', line 307 def get_object(bucket, key, = {}) raise ArgumentError, t('zero_length_key') if key == '' raise ArgumentError, t('string_type', :string => key) unless key.is_a? String backend do |b| b.fetch_object(bucket, key, ) end end |
#get_preflist(bucket, key, type = nil, options = { }) ⇒ Array<PreflistItem>
Retrieves a preflist for the given bucket, key, and type; useful for figuring out where in the cluster an object is stored.
295 296 297 298 299 |
# File 'lib/riak/client.rb', line 295 def get_preflist(bucket, key, type = nil, = { }) backend do |b| b.get_preflist bucket, key, type, end end |
#get_search_index(name) ⇒ Object
25 26 27 28 29 30 31 32 |
# File 'lib/riak/client/yokozuna.rb', line 25 def get_search_index(name) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? resp = [] backend do |b| resp = b.get_search_index(name) end resp.index && Array === resp.index ? resp.index.first : resp end |
#get_search_schema(name) ⇒ Object
59 60 61 62 63 64 |
# File 'lib/riak/client/yokozuna.rb', line 59 def get_search_schema(name) raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty? backend do |b| return b.get_search_schema(name) end end |
#inspect ⇒ String
Returns A representation suitable for IRB and debugging output.
316 317 318 |
# File 'lib/riak/client.rb', line 316 def inspect "#<Riak::Client #{nodes.inspect}>" end |
#list_keys(bucket, options = {}, &block) ⇒ Object
Retrieves a list of keys in the given bucket. See Bucket#keys
321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/riak/client.rb', line 321 def list_keys(bucket, = {}, &block) if block_given? backend do |b| b.list_keys bucket, , &block end else backend do |b| b.list_keys bucket, end end end |
#list_search_indexes ⇒ Object
34 35 36 37 38 39 40 |
# File 'lib/riak/client/yokozuna.rb', line 34 def list_search_indexes() resp = [] backend do |b| resp = b.get_search_index(nil) end resp.index ? resp.index : resp end |
#mapred(mr, &block) ⇒ Object
Executes a mapreduce request. See MapReduce#run
334 335 336 337 338 |
# File 'lib/riak/client.rb', line 334 def mapred(mr, &block) backend do |b| b.mapred(mr, &block) end end |
#new_protobuffs_backend ⇒ ProtobuffsBackend
Creates a new protocol buffers backend.
343 344 345 346 347 348 349 350 351 352 353 354 355 |
# File 'lib/riak/client.rb', line 343 def new_protobuffs_backend klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend") unless klass.configured? raise BackendCreationError.new @protobuffs_backend end node = choose_node( @nodes.select do |n| n.protobuffs? end ) klass.new(self, node) end |
#node ⇒ Node
Returns An arbitrary Node.
358 359 360 |
# File 'lib/riak/client.rb', line 358 def node nodes[rand nodes.size] end |
#ping ⇒ true, false
Pings the Riak cluster to check for liveness.
364 365 366 367 368 |
# File 'lib/riak/client.rb', line 364 def ping backend do |b| b.ping end end |
#protobuffs(&block) ⇒ Object Also known as: backend
Yields a protocol buffers backend.
371 372 373 |
# File 'lib/riak/client.rb', line 371 def protobuffs(&block) recover_from @protobuffs_pool, &block end |
#recover_from(pool) ⇒ Object
Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.
386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/riak/client.rb', line 386 def recover_from(pool) skip_nodes = [] take_opts = {} tries = 1 + max_retries begin # Only select nodes which we haven't used before. unless skip_nodes.empty? take_opts[:filter] = lambda do |backend| not skip_nodes.include? backend.node end end # Acquire a backend pool.take(take_opts) do |backend| begin yield backend rescue *NETWORK_ERRORS => e Riak.logger.warn("Riak client error: #{e.inspect} for #{backend.inspect}") # Network error. tries -= 1 # Notify the node that a request against it failed. backend.node.error_rate << 1 # Skip this node next time. skip_nodes << backend.node # And delete this connection. raise Pool::BadResource, e end end rescue Pool::BadResource => e retry if tries > 0 raise e. end end |
#reload_object(object, options = {}) ⇒ Object
Reloads the object from Riak.
426 427 428 429 430 |
# File 'lib/riak/client.rb', line 426 def reload_object(object, = {}) backend do |b| b.reload_object(object, ) end end |
#search(index, query, options = {}) ⇒ Hash #search(query, options = {}) ⇒ Hash Also known as: select
(Riak Search) Performs a search via the Solr interface.
32 33 34 35 36 37 38 |
# File 'lib/riak/client/search.rb', line 32 def search(*args) = args. index, query = args[-2], args[-1] # Allows nil index, while keeping it as firstargument backend do |b| b.search(index, query, ) end end |
#security? ⇒ Boolean
Is security enabled?
156 157 158 |
# File 'lib/riak/client.rb', line 156 def security? !!authentication end |
#set_bucket_props(bucket, properties, type = nil) ⇒ Object
Sets the properties on a bucket. See Bucket#props=
433 434 435 436 437 |
# File 'lib/riak/client.rb', line 433 def set_bucket_props(bucket, properties, type = nil) backend do |b| b.set_bucket_props(bucket, properties, type) end end |
#stamp ⇒ Stamp
Exposes a Stamp object for use in generating unique identifiers.
450 451 452 |
# File 'lib/riak/client.rb', line 450 def stamp @stamp ||= Riak::Stamp.new(self) end |
#store_object(object, options = {}) ⇒ Object
Stores an object in Riak.
456 457 458 459 460 461 |
# File 'lib/riak/client.rb', line 456 def store_object(object, = {}) params = {:returnbody => true}.merge() backend do |b| b.store_object(object, params) end end |