Class: Riak::Client
- Includes:
- Instrumentable, Util::Escape, Util::Translation
- Defined in:
- lib/riak/client.rb,
lib/riak/client/node.rb,
lib/riak/client/search.rb,
lib/riak/client/decaying.rb,
lib/riak/client/yokozuna.rb,
lib/riak/client/beefcake/socket.rb,
lib/riak/client/instrumentation.rb,
lib/riak/client/beefcake/messages.rb,
lib/riak/client/beefcake/protocol.rb,
lib/riak/client/feature_detection.rb,
lib/riak/client/protobuffs_backend.rb,
lib/riak/client/beefcake/crdt_loader.rb,
lib/riak/client/beefcake/crdt_operator.rb,
lib/riak/client/beefcake/message_codes.rb,
lib/riak/client/beefcake/object_methods.rb,
lib/riak/client/beefcake/message_overlay.rb,
lib/riak/client/beefcake_protobuffs_backend.rb
Overview
A client connection to Riak.
Defined Under Namespace
Modules: BeefcakeMessageCodes, FeatureDetection Classes: BeefcakeProtobuffsBackend, Decaying, Node, ProtobuffsBackend
Constant Summary collapse
- MAX_CLIENT_ID =
When using integer client IDs, the exclusive upper-bound of valid values.
4294967296
- HOST_REGEX =
Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6
/^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n
- VALID_OPTIONS =
Valid constructor options.
[:nodes, :client_id, :protobuffs_backend, :authentication, :max_retries, :connect_timeout, :read_timeout, :write_timeout, :convert_timestamp] | Node::VALID_OPTIONS
- NETWORK_ERRORS =
Network errors.
[ EOFError, Errno::ECONNABORTED, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::ENETDOWN, Errno::ENETRESET, Errno::ENETUNREACH, Errno::ETIMEDOUT, SocketError, SystemCallError, Riak::ProtobuffsFailedHeader, ]
- Pool =
::Innertube::Pool
Instance Attribute Summary collapse
-
#authentication ⇒ Hash
readonly
The authentication information this client will use.
-
#client_id ⇒ String
The internal client ID used by Riak to route responses.
-
#connect_timeout ⇒ Numeric
readonly
The connect timeout, in seconds.
-
#convert_timestamp ⇒ Boolean
readonly
Convert timestamps from Riak TS to Time objects.
-
#max_retries ⇒ Integer
The maximum number of retries in case of NETWORK_ERRORS.
-
#multi_threads ⇒ Integer
(also: #multiget_threads)
The number of threads for multiget requests.
-
#nodes ⇒ Array
The set of Nodes this client can communicate with.
-
#protobuffs_backend ⇒ Symbol
The Protocol Buffers backend/client to use.
-
#protobuffs_pool ⇒ Client::Pool
readonly
A pool of protobuffs connections.
-
#read_timeout ⇒ Numeric
readonly
The read timeout, in seconds.
-
#write_timeout ⇒ Numeric
readonly
The write timeout, in seconds.
Instance Method Summary collapse
-
#bucket(name, options = {}) ⇒ Bucket
(also: #[])
Retrieves a bucket from Riak.
- #bucket_type(name) ⇒ Object
-
#buckets(options = {}, &block) ⇒ Array<Bucket>
(also: #list_buckets)
Lists buckets which have keys stored in them.
-
#choose_node(nodes = self.nodes) ⇒ Object
Choose a node from a set.
-
#clear_bucket_props(bucket, options = { }) ⇒ Object
Clears the properties on a bucket.
- #create_search_index(name, schema = nil, n_val = nil, timeout = nil) ⇒ Object
- #create_search_schema(name, content) ⇒ Object
-
#delete_object(bucket, key, options = {}) ⇒ Object
Delete an object.
- #delete_search_index(name) ⇒ Object
-
#get_bucket_props(bucket, options = { }) ⇒ Object
Bucket properties.
-
#get_index(bucket, index, query, options = {}) ⇒ Object
Queries a secondary index on a bucket.
-
#get_many(pairs) ⇒ Object
Get multiple objects in parallel.
-
#get_object(bucket, key, options = {}) ⇒ Object
Get an object.
-
#get_preflist(bucket, key, type = nil, options = { }) ⇒ Array<PreflistItem>
Retrieves a preflist for the given bucket, key, and type; useful for figuring out where in the cluster an object is stored.
- #get_search_index(name) ⇒ Object
- #get_search_schema(name) ⇒ Object
-
#initialize(options = {}) ⇒ Client
constructor
Creates a client connection to Riak.
-
#inspect ⇒ String
A representation suitable for IRB and debugging output.
-
#list_keys(bucket, options = {}, &block) ⇒ Object
Retrieves a list of keys in the given bucket.
- #list_search_indexes ⇒ Object
-
#mapred(mr, &block) ⇒ Object
Executes a mapreduce request.
-
#new_protobuffs_backend ⇒ ProtobuffsBackend
Creates a new protocol buffers backend.
-
#node ⇒ Node
An arbitrary Node.
-
#ping ⇒ true, false
Pings the Riak cluster to check for liveness.
-
#protobuffs(&block) ⇒ Object
(also: #backend)
Yields a protocol buffers backend.
-
#recover_from(pool) ⇒ Object
Takes a pool.
-
#reload_object(object, options = {}) ⇒ Object
Reloads the object from Riak.
-
#search(*args) ⇒ Hash
(also: #select)
(Riak Search) Performs a search via the Solr interface.
-
#security? ⇒ Boolean
Is security enabled?.
-
#set_bucket_props(bucket, properties, type = nil) ⇒ Object
Sets the properties on a bucket.
-
#stamp ⇒ Stamp
Exposes a Stamp object for use in generating unique identifiers.
-
#store_object(object, options = {}) ⇒ Object
Stores an object in Riak.
Methods included from Util::Escape
#escape, #maybe_escape, #maybe_unescape, #unescape
Methods included from Util::Translation
Constructor Details
#initialize(options = {}) ⇒ Client
Creates a client connection to Riak
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/riak/client.rb', line 112 def initialize( = {}) if .include? :port warn(t('deprecated.port', :backtrace => caller[0..2].join("\n "))) end unless (evil = .keys - VALID_OPTIONS).empty? raise ArgumentError, "#{evil.inspect} are not valid options for Client.new" end @nodes = build_nodes() @protobuffs_pool = Pool.new( method(:new_protobuffs_backend), lambda { |b| b.teardown } ) self.protobuffs_backend = [:protobuffs_backend] || :Beefcake self.client_id = [:client_id] if [:client_id] self.multi_threads = [:multi_threads] || [:multiget_threads] @authentication = [:authentication] && [:authentication].symbolize_keys self.max_retries = [:max_retries] || 2 @connect_timeout = [:connect_timeout] @read_timeout = [:read_timeout] @write_timeout = [:write_timeout] @convert_timestamp = [:convert_timestamp] || false end |
Instance Attribute Details
#authentication ⇒ Hash (readonly)
Returns The authentication information this client will use.
79 80 81 |
# File 'lib/riak/client.rb', line 79 def authentication @authentication end |
#client_id ⇒ String
Returns The internal client ID used by Riak to route responses.
64 65 66 |
# File 'lib/riak/client.rb', line 64 def client_id @client_id end |
#connect_timeout ⇒ Numeric (readonly)
Returns The connect timeout, in seconds.
85 86 87 |
# File 'lib/riak/client.rb', line 85 def connect_timeout @connect_timeout end |
#convert_timestamp ⇒ Boolean (readonly)
Returns Convert timestamps from Riak TS to Time objects.
94 95 96 |
# File 'lib/riak/client.rb', line 94 def @convert_timestamp end |
#max_retries ⇒ Integer
Returns The maximum number of retries in case of NETWORK_ERRORS.
82 83 84 |
# File 'lib/riak/client.rb', line 82 def max_retries @max_retries end |
#multi_threads ⇒ Integer Also known as: multiget_threads
Returns The number of threads for multiget requests.
73 74 75 |
# File 'lib/riak/client.rb', line 73 def multi_threads @multi_threads end |
#nodes ⇒ Array
Returns The set of Nodes this client can communicate with.
61 62 63 |
# File 'lib/riak/client.rb', line 61 def nodes @nodes end |
#protobuffs_backend ⇒ Symbol
Returns The Protocol Buffers backend/client to use.
67 68 69 |
# File 'lib/riak/client.rb', line 67 def protobuffs_backend @protobuffs_backend end |
#protobuffs_pool ⇒ Client::Pool (readonly)
Returns A pool of protobuffs connections.
70 71 72 |
# File 'lib/riak/client.rb', line 70 def protobuffs_pool @protobuffs_pool end |
#read_timeout ⇒ Numeric (readonly)
Returns The read timeout, in seconds.
88 89 90 |
# File 'lib/riak/client.rb', line 88 def read_timeout @read_timeout end |
#write_timeout ⇒ Numeric (readonly)
Returns The write timeout, in seconds.
91 92 93 |
# File 'lib/riak/client.rb', line 91 def write_timeout @write_timeout end |
Instance Method Details
#bucket(name, options = {}) ⇒ Bucket Also known as: []
Retrieves a bucket from Riak.
150 151 152 153 154 155 156 157 158 159 |
# File 'lib/riak/client.rb', line 150 def bucket(name, = {}) raise ArgumentError, t('zero_length_bucket') if name == '' unless (.keys - [:props]).empty? raise ArgumentError, "invalid options" end @bucket_cache ||= {} (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b| b.props if [:props] end end |
#bucket_type(name) ⇒ Object
162 163 164 |
# File 'lib/riak/client.rb', line 162 def bucket_type(name) BucketType.new self, name end |
#buckets(options = {}, &block) ⇒ Array<Bucket> Also known as: list_buckets
This is an expensive operation and should be used only in development.
Lists buckets which have keys stored in them.
170 171 172 173 174 175 176 177 178 |
# File 'lib/riak/client.rb', line 170 def buckets( = {}, &block) warn(t('list_buckets', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings return ListBuckets.new self, , block if block_given? backend do |b| b.list_buckets().map {|name| Bucket.new(self, name) } end end |
#choose_node(nodes = self.nodes) ⇒ Object
Choose a node from a set.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/riak/client.rb', line 182 def choose_node(nodes = self.nodes) # Prefer nodes which have gone a reasonable time without errors. s = nodes.select do |node| node.error_rate.value < 0.1 end if s.empty? # Fall back to minimally broken node. nodes.min_by do |node| node.error_rate.value end else s[rand(s.size)] end end |
#clear_bucket_props(bucket, options = { }) ⇒ Object
Clears the properties on a bucket. See Bucket#clear_props
422 423 424 425 426 |
# File 'lib/riak/client.rb', line 422 def clear_bucket_props(bucket, = { }) backend do |b| b.reset_bucket_props(bucket, ) end end |
#create_search_index(name, schema = nil, n_val = nil, timeout = nil) ⇒ Object
3 4 5 6 7 8 9 |
# File 'lib/riak/client/yokozuna.rb', line 3 def create_search_index(name, schema = nil, n_val = nil, timeout = nil) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? backend do |b| b.create_search_index(name, schema, n_val, timeout) end true end |
#create_search_schema(name, content) ⇒ Object
36 37 38 39 40 41 42 43 |
# File 'lib/riak/client/yokozuna.rb', line 36 def create_search_schema(name, content) raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty? raise ArgumentError, t("zero_length_content") if content.nil? || content.empty? backend do |b| b.create_search_schema(name, content) end true end |
#delete_object(bucket, key, options = {}) ⇒ Object
Delete an object. See Bucket#delete
250 251 252 253 254 |
# File 'lib/riak/client.rb', line 250 def delete_object(bucket, key, = {}) backend do |b| b.delete_object(bucket, key, ) end end |
#delete_search_index(name) ⇒ Object
28 29 30 31 32 33 34 |
# File 'lib/riak/client/yokozuna.rb', line 28 def delete_search_index(name) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? backend do |b| b.delete_search_index(name) end true end |
#get_bucket_props(bucket, options = { }) ⇒ Object
Bucket properties. See Bucket#props
257 258 259 260 261 |
# File 'lib/riak/client.rb', line 257 def get_bucket_props(bucket, = { }) backend do |b| b.get_bucket_props bucket, end end |
#get_index(bucket, index, query, options = {}) ⇒ Object
Queries a secondary index on a bucket. See Bucket#get_index
264 265 266 267 268 |
# File 'lib/riak/client.rb', line 264 def get_index(bucket, index, query, = {}) backend do |b| b.get_index bucket, index, query, end end |
#get_many(pairs) ⇒ Object
Get multiple objects in parallel.
284 285 286 |
# File 'lib/riak/client.rb', line 284 def get_many(pairs) Multiget.perform self, pairs end |
#get_object(bucket, key, options = {}) ⇒ Object
Get an object. See Bucket#get
289 290 291 292 293 294 295 |
# File 'lib/riak/client.rb', line 289 def get_object(bucket, key, = {}) raise ArgumentError, t('zero_length_key') if key == '' raise ArgumentError, t('string_type', :string => key) unless key.is_a? String backend do |b| b.fetch_object(bucket, key, ) end end |
#get_preflist(bucket, key, type = nil, options = { }) ⇒ Array<PreflistItem>
Retrieves a preflist for the given bucket, key, and type; useful for figuring out where in the cluster an object is stored.
277 278 279 280 281 |
# File 'lib/riak/client.rb', line 277 def get_preflist(bucket, key, type = nil, = { }) backend do |b| b.get_preflist bucket, key, type, end end |
#get_search_index(name) ⇒ Object
11 12 13 14 15 16 17 18 |
# File 'lib/riak/client/yokozuna.rb', line 11 def get_search_index(name) raise ArgumentError, t("zero_length_index") if name.nil? || name.empty? resp = [] backend do |b| resp = b.get_search_index(name) end resp.index && Array === resp.index ? resp.index.first : resp end |
#get_search_schema(name) ⇒ Object
45 46 47 48 49 50 |
# File 'lib/riak/client/yokozuna.rb', line 45 def get_search_schema(name) raise ArgumentError, t("zero_length_schema") if name.nil? || name.empty? backend do |b| return b.get_search_schema(name) end end |
#inspect ⇒ String
Returns A representation suitable for IRB and debugging output.
298 299 300 |
# File 'lib/riak/client.rb', line 298 def inspect "#<Riak::Client #{nodes.inspect}>" end |
#list_keys(bucket, options = {}, &block) ⇒ Object
Retrieves a list of keys in the given bucket. See Bucket#keys
303 304 305 306 307 308 309 310 311 312 313 |
# File 'lib/riak/client.rb', line 303 def list_keys(bucket, = {}, &block) if block_given? backend do |b| b.list_keys bucket, , &block end else backend do |b| b.list_keys bucket, end end end |
#list_search_indexes ⇒ Object
20 21 22 23 24 25 26 |
# File 'lib/riak/client/yokozuna.rb', line 20 def list_search_indexes() resp = [] backend do |b| resp = b.get_search_index(nil) end resp.index ? resp.index : resp end |
#mapred(mr, &block) ⇒ Object
Executes a mapreduce request. See MapReduce#run
316 317 318 319 320 |
# File 'lib/riak/client.rb', line 316 def mapred(mr, &block) backend do |b| b.mapred(mr, &block) end end |
#new_protobuffs_backend ⇒ ProtobuffsBackend
Creates a new protocol buffers backend.
325 326 327 328 329 330 331 332 333 334 335 336 337 |
# File 'lib/riak/client.rb', line 325 def new_protobuffs_backend klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend") unless klass.configured? raise BackendCreationError.new @protobuffs_backend end node = choose_node( @nodes.select do |n| n.protobuffs? end ) klass.new(self, node) end |
#node ⇒ Node
Returns An arbitrary Node.
340 341 342 |
# File 'lib/riak/client.rb', line 340 def node nodes[rand nodes.size] end |
#ping ⇒ true, false
Pings the Riak cluster to check for liveness.
346 347 348 349 350 |
# File 'lib/riak/client.rb', line 346 def ping backend do |b| b.ping end end |
#protobuffs(&block) ⇒ Object Also known as: backend
Yields a protocol buffers backend.
353 354 355 |
# File 'lib/riak/client.rb', line 353 def protobuffs(&block) recover_from @protobuffs_pool, &block end |
#recover_from(pool) ⇒ Object
Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.
368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 |
# File 'lib/riak/client.rb', line 368 def recover_from(pool) skip_nodes = [] take_opts = {} tries = 1 + max_retries begin # Only select nodes which we haven't used before. unless skip_nodes.empty? take_opts[:filter] = lambda do |backend| not skip_nodes.include? backend.node end end # Acquire a backend pool.take(take_opts) do |backend| begin yield backend rescue *NETWORK_ERRORS => e Riak.logger.warn("Riak client error: #{e.inspect} for #{backend.inspect}") # Network error. tries -= 1 # Notify the node that a request against it failed. backend.node.error_rate << 1 # Skip this node next time. skip_nodes << backend.node # And delete this connection. raise Pool::BadResource, e end end rescue Pool::BadResource => e retry if tries > 0 raise e. end end |
#reload_object(object, options = {}) ⇒ Object
Reloads the object from Riak.
408 409 410 411 412 |
# File 'lib/riak/client.rb', line 408 def reload_object(object, = {}) backend do |b| b.reload_object(object, ) end end |
#search(index, query, options = {}) ⇒ Hash #search(query, options = {}) ⇒ Hash Also known as: select
(Riak Search) Performs a search via the Solr interface.
18 19 20 21 22 23 24 |
# File 'lib/riak/client/search.rb', line 18 def search(*args) = args. index, query = args[-2], args[-1] # Allows nil index, while keeping it as firstargument backend do |b| b.search(index, query, ) end end |
#security? ⇒ Boolean
Is security enabled?
141 142 143 |
# File 'lib/riak/client.rb', line 141 def security? !!authentication end |
#set_bucket_props(bucket, properties, type = nil) ⇒ Object
Sets the properties on a bucket. See Bucket#props=
415 416 417 418 419 |
# File 'lib/riak/client.rb', line 415 def set_bucket_props(bucket, properties, type = nil) backend do |b| b.set_bucket_props(bucket, properties, type) end end |
#stamp ⇒ Stamp
Exposes a Stamp object for use in generating unique identifiers.
432 433 434 |
# File 'lib/riak/client.rb', line 432 def stamp @stamp ||= Riak::Stamp.new(self) end |
#store_object(object, options = {}) ⇒ Object
Stores an object in Riak.
438 439 440 441 442 443 |
# File 'lib/riak/client.rb', line 438 def store_object(object, = {}) params = {:returnbody => true}.merge() backend do |b| b.store_object(object, params) end end |