Class: Riak::Client
- Includes:
- Util::Escape, Util::Translation
- Defined in:
- lib/riak/client.rb,
lib/riak/client/node.rb,
lib/riak/client/search.rb,
lib/riak/client/decaying.rb,
lib/riak/client/http_backend.rb,
lib/riak/client/excon_backend.rb,
lib/riak/client/net_http_backend.rb,
lib/riak/client/beefcake/messages.rb,
lib/riak/client/feature_detection.rb,
lib/riak/client/protobuffs_backend.rb,
lib/riak/client/beefcake/object_methods.rb,
lib/riak/client/http_backend/key_streamer.rb,
lib/riak/client/http_backend/configuration.rb,
lib/riak/client/beefcake_protobuffs_backend.rb,
lib/riak/client/http_backend/object_methods.rb,
lib/riak/client/http_backend/request_headers.rb,
lib/riak/client/http_backend/transport_methods.rb
Overview
A client connection to Riak.
Defined Under Namespace
Modules: FeatureDetection Classes: BeefcakeProtobuffsBackend, Decaying, ExconBackend, HTTPBackend, LuwakFile, NetHTTPBackend, Node, ProtobuffsBackend
Constant Summary collapse
- MAX_CLIENT_ID =
When using integer client IDs, the exclusive upper-bound of valid values.
4294967296
- PROTOCOLS =
Array of valid protocols
%w[http https pbc]
- HOST_REGEX =
Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6
/^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n
- VALID_OPTIONS =
Valid constructor options.
[:protocol, :nodes, :client_id, :http_backend, :protobuffs_backend] | Node::VALID_OPTIONS
- NETWORK_ERRORS =
Network errors.
[ EOFError, Errno::ECONNABORTED, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::ENETDOWN, Errno::ENETRESET, Errno::ENETUNREACH, SocketError, SystemCallError, ]
- Pool =
::Innertube::Pool
Instance Attribute Summary collapse
-
#client_id ⇒ String
The internal client ID used by Riak to route responses.
-
#http_backend ⇒ Symbol
The HTTP backend/client to use.
-
#http_pool ⇒ Client::Pool
readonly
A pool of HTTP connections.
-
#nodes ⇒ Array
The set of Nodes this client can communicate with.
-
#protobuffs_backend ⇒ Symbol
The Protocol Buffers backend/client to use.
-
#protobuffs_pool ⇒ Client::Pool
readonly
A pool of protobuffs connections.
-
#protocol ⇒ String
The protocol to use for the Riak endpoint.
Instance Method Summary collapse
-
#backend {|HTTPBackend, ProtobuffsBackend| ... } ⇒ Object
Yields a backend for operations that are protocol-independent.
-
#basic_auth=(auth) ⇒ Object
Sets basic HTTP auth on all nodes.
-
#bucket(name, options = {}) ⇒ Bucket
(also: #[])
Retrieves a bucket from Riak.
-
#buckets ⇒ Array<Bucket>
(also: #list_buckets)
Lists buckets which have keys stored in them.
-
#choose_node(nodes = self.nodes) ⇒ Object
Choose a node from a set.
-
#delete_file(filename) ⇒ Object
Deletes a file stored via the “Luwak” interface.
-
#delete_object(bucket, key, options = {}) ⇒ Object
Delete an object.
-
#file_exists?(key) ⇒ true, false
(also: #file_exist?)
Checks whether a file exists in “Luwak”.
-
#get_bucket_props(bucket) ⇒ Object
Bucket properties.
-
#get_file(filename) {|chunk| ... } ⇒ IO?
Retrieves a large file/IO object from Riak via the “Luwak” interface.
-
#get_index(bucket, index, query) ⇒ Object
Queries a secondary index on a bucket.
-
#get_object(bucket, key, options = {}) ⇒ Object
Get an object.
-
#http(&block) ⇒ Object
Yields an HTTPBackend.
-
#index(*args) ⇒ Object
(also: #add_doc)
(Riak Search) Adds documents to a search index via the Solr interface.
-
#initialize(options = {}) ⇒ Client
constructor
Creates a client connection to Riak.
-
#inspect ⇒ String
A representation suitable for IRB and debugging output.
-
#link_walk(object, specs) ⇒ Object
Link-walk.
-
#list_keys(bucket, &block) ⇒ Object
Retrieves a list of keys in the given bucket.
-
#mapred(mr, &block) ⇒ Object
Executes a mapreduce request.
-
#new_http_backend ⇒ HTTPBackend
Creates a new HTTP backend.
-
#new_protobuffs_backend ⇒ ProtobuffsBackend
Creates a new protocol buffers backend.
-
#node ⇒ Node
An arbitrary Node.
-
#ping ⇒ true, false
Pings the Riak cluster to check for liveness.
-
#protobuffs(&block) ⇒ Object
Yields a protocol buffers backend.
-
#recover_from(pool) ⇒ Object
Takes a pool.
-
#reload_object(object, options = {}) ⇒ Object
Reloads the object from Riak.
-
#remove(*args) ⇒ Object
(also: #delete_doc, #deindex)
(Riak Search) Removes documents from a search index via the Solr interface.
-
#search(*args) ⇒ Hash
(also: #select)
(Riak Search) Performs a search via the Solr interface.
-
#set_bucket_props(bucket, properties) ⇒ Object
Sets the properties on a bucket.
-
#ssl=(value) ⇒ Object
Enables or disables SSL on all nodes, for HTTP backends.
-
#stamp ⇒ Stamp
Exposes a Stamp object for use in generating unique identifiers.
-
#store_file(*args) ⇒ String
Stores a large file/IO-like object in Riak via the “Luwak” interface.
-
#store_object(object, options = {}) ⇒ Object
Stores an object in Riak.
Methods included from Util::Escape
#escape, #maybe_escape, #maybe_unescape, #unescape
Methods included from Util::Translation
Constructor Details
#initialize(options = {}) ⇒ Client
Creates a client connection to Riak
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/riak/client.rb', line 91 def initialize(={}) if .include? :port warn(t('deprecated.port', :backtrace => caller[0..2].join("\n "))) end unless (evil = .keys - VALID_OPTIONS).empty? raise ArgumentError, "#{evil.inspect} are not valid options for Client.new" end @nodes = ([:nodes] || []).map do |n| Client::Node.new self, n end if @nodes.empty? or [:host] or [:http_port] or [:pb_port] @nodes |= [Client::Node.new(self, )] end @protobuffs_pool = Pool.new( method(:new_protobuffs_backend), lambda { |b| b.teardown } ) @http_pool = Pool.new( method(:new_http_backend), lambda { |b| b.teardown } ) self.protocol = [:protocol] || "http" self.http_backend = [:http_backend] || :NetHTTP self.protobuffs_backend = [:protobuffs_backend] || :Beefcake self.client_id = [:client_id] if [:client_id] self.ssl = [:ssl] if [:ssl] end |
Instance Attribute Details
#client_id ⇒ String
Returns The internal client ID used by Riak to route responses.
59 60 61 |
# File 'lib/riak/client.rb', line 59 def client_id @client_id end |
#http_backend ⇒ Symbol
Returns The HTTP backend/client to use.
62 63 64 |
# File 'lib/riak/client.rb', line 62 def http_backend @http_backend end |
#http_pool ⇒ Client::Pool (readonly)
Returns A pool of HTTP connections.
65 66 67 |
# File 'lib/riak/client.rb', line 65 def http_pool @http_pool end |
#nodes ⇒ Array
Returns The set of Nodes this client can communicate with.
56 57 58 |
# File 'lib/riak/client.rb', line 56 def nodes @nodes end |
#protobuffs_backend ⇒ Symbol
Returns The Protocol Buffers backend/client to use.
68 69 70 |
# File 'lib/riak/client.rb', line 68 def protobuffs_backend @protobuffs_backend end |
#protobuffs_pool ⇒ Client::Pool (readonly)
Returns A pool of protobuffs connections.
71 72 73 |
# File 'lib/riak/client.rb', line 71 def protobuffs_pool @protobuffs_pool end |
#protocol ⇒ String
Returns The protocol to use for the Riak endpoint.
53 54 55 |
# File 'lib/riak/client.rb', line 53 def protocol @protocol end |
Instance Method Details
#backend {|HTTPBackend, ProtobuffsBackend| ... } ⇒ Object
Yields a backend for operations that are protocol-independent. You can change which type of backend is used by setting the #protocol.
128 129 130 131 132 133 134 135 |
# File 'lib/riak/client.rb', line 128 def backend(&block) case @protocol.to_s when /https?/i http &block when /pbc/i protobuffs &block end end |
#basic_auth=(auth) ⇒ Object
Sets basic HTTP auth on all nodes.
138 139 140 141 142 143 |
# File 'lib/riak/client.rb', line 138 def basic_auth=(auth) @nodes.each do |node| node.basic_auth = auth end auth end |
#bucket(name, options = {}) ⇒ Bucket Also known as: []
Retrieves a bucket from Riak.
150 151 152 153 154 155 156 157 158 |
# File 'lib/riak/client.rb', line 150 def bucket(name, ={}) unless (.keys - [:props]).empty? raise ArgumentError, "invalid options" end @bucket_cache ||= {} (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b| b.props if [:props] end end |
#buckets ⇒ Array<Bucket> Also known as: list_buckets
This is an expensive operation and should be used only in development.
Lists buckets which have keys stored in them.
165 166 167 168 169 170 |
# File 'lib/riak/client.rb', line 165 def buckets warn(t('list_buckets', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings backend do |b| b.list_buckets.map {|name| Bucket.new(self, name) } end end |
#choose_node(nodes = self.nodes) ⇒ Object
Choose a node from a set.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/riak/client.rb', line 174 def choose_node(nodes = self.nodes) # Prefer nodes which have gone a reasonable time without errors. s = nodes.select do |node| node.error_rate.value < 0.1 end if s.empty? # Fall back to minimally broken node. nodes.min_by do |node| node.error_rate.value end else s[rand(s.size)] end end |
#delete_file(filename) ⇒ Object
Deletes a file stored via the “Luwak” interface
222 223 224 225 226 227 |
# File 'lib/riak/client.rb', line 222 def delete_file(filename) http do |h| h.delete_file(filename) end true end |
#delete_object(bucket, key, options = {}) ⇒ Object
Delete an object. See Bucket#delete
230 231 232 233 234 |
# File 'lib/riak/client.rb', line 230 def delete_object(bucket, key, = {}) backend do |b| b.delete_object(bucket, key, ) end end |
#file_exists?(key) ⇒ true, false Also known as: file_exist?
Checks whether a file exists in “Luwak”.
239 240 241 242 243 |
# File 'lib/riak/client.rb', line 239 def file_exists?(key) http do |h| h.file_exists?(key) end end |
#get_bucket_props(bucket) ⇒ Object
Bucket properties. See Bucket#props
247 248 249 250 251 |
# File 'lib/riak/client.rb', line 247 def get_bucket_props(bucket) backend do |b| b.get_bucket_props bucket end end |
#get_file(filename) {|chunk| ... } ⇒ IO?
Retrieves a large file/IO object from Riak via the “Luwak” interface. Streams the data to a temporary file unless a block is given.
264 265 266 267 268 |
# File 'lib/riak/client.rb', line 264 def get_file(filename, &block) http do |h| h.get_file(filename, &block) end end |
#get_index(bucket, index, query) ⇒ Object
Queries a secondary index on a bucket. See Bucket#get_index
271 272 273 274 275 |
# File 'lib/riak/client.rb', line 271 def get_index(bucket, index, query) backend do |b| b.get_index bucket, index, query end end |
#get_object(bucket, key, options = {}) ⇒ Object
Get an object. See Bucket#get
278 279 280 281 282 |
# File 'lib/riak/client.rb', line 278 def get_object(bucket, key, = {}) backend do |b| b.fetch_object(bucket, key, ) end end |
#http(&block) ⇒ Object
Yields an HTTPBackend.
285 286 287 |
# File 'lib/riak/client.rb', line 285 def http(&block) recover_from @http_pool, &block end |
#index(index, *docs) ⇒ Object #index(*docs) ⇒ Object Also known as: add_doc
(Riak Search) Adds documents to a search index via the Solr interface.
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/riak/client/search.rb', line 38 def index(*args) index = args.shift if String === args.first # Documents must be hashes of fields raise ArgumentError.new(t("search_docs_require_id")) unless args.all? {|d| d.key?("id") || d.key?(:id) } xml = Builder::XmlMarkup.new xml.add do args.each do |doc| xml.doc do doc.each do |k,v| xml.field('name' => k.to_s) { xml.text!(v.to_s) } end end end end http do |h| h.update_search_index(index, xml.target!) end true end |
#inspect ⇒ String
Returns A representation suitable for IRB and debugging output.
298 299 300 |
# File 'lib/riak/client.rb', line 298 def inspect "#<Riak::Client #{nodes.inspect}>" end |
#link_walk(object, specs) ⇒ Object
Link-walk.
303 304 305 306 307 |
# File 'lib/riak/client.rb', line 303 def link_walk(object, specs) http do |h| h.link_walk object, specs end end |
#list_keys(bucket, &block) ⇒ Object
Retrieves a list of keys in the given bucket. See Bucket#keys
310 311 312 313 314 315 316 317 318 319 320 |
# File 'lib/riak/client.rb', line 310 def list_keys(bucket, &block) if block_given? backend do |b| b.list_keys bucket, &block end else backend do |b| b.list_keys bucket end end end |
#mapred(mr, &block) ⇒ Object
Executes a mapreduce request. See MapReduce#run
323 324 325 326 327 |
# File 'lib/riak/client.rb', line 323 def mapred(mr, &block) backend do |b| b.mapred(mr, &block) end end |
#new_http_backend ⇒ HTTPBackend
Creates a new HTTP backend.
331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/riak/client.rb', line 331 def new_http_backend klass = self.class.const_get("#{@http_backend}Backend") if klass.configured? node = choose_node( @nodes.select do |n| n.http? end ) klass.new(self, node) else raise t('http_configuration', :backend => @http_backend) end end |
#new_protobuffs_backend ⇒ ProtobuffsBackend
Creates a new protocol buffers backend.
349 350 351 352 353 354 355 356 357 358 359 360 361 362 |
# File 'lib/riak/client.rb', line 349 def new_protobuffs_backend klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend") if klass.configured? node = choose_node( @nodes.select do |n| n.protobuffs? end ) klass.new(self, node) else raise t('protobuffs_configuration', :backend => @protobuffs_backend) end end |
#node ⇒ Node
Returns An arbitrary Node.
365 366 367 |
# File 'lib/riak/client.rb', line 365 def node nodes[rand nodes.size] end |
#ping ⇒ true, false
Pings the Riak cluster to check for liveness.
371 372 373 374 375 |
# File 'lib/riak/client.rb', line 371 def ping backend do |b| b.ping end end |
#protobuffs(&block) ⇒ Object
Yields a protocol buffers backend.
378 379 380 |
# File 'lib/riak/client.rb', line 378 def protobuffs(&block) recover_from @protobuffs_pool, &block end |
#recover_from(pool) ⇒ Object
Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
# File 'lib/riak/client.rb', line 419 def recover_from(pool) skip_nodes = [] take_opts = {} tries = 3 begin # Only select nodes which we haven't used before. unless skip_nodes.empty? take_opts[:filter] = lambda do |backend| not skip_nodes.include? backend.node end end # Acquire a backend pool.take(take_opts) do |backend| begin yield backend rescue *NETWORK_ERRORS => e # Network error. tries -= 1 # Notify the node that a request against it failed. backend.node.error_rate << 1 # Skip this node next time. skip_nodes << backend.node # And delete this connection. raise Pool::BadResource, e end end rescue Pool::BadResource => e retry if tries > 0 raise e. end end |
#reload_object(object, options = {}) ⇒ Object
Reloads the object from Riak.
457 458 459 460 461 |
# File 'lib/riak/client.rb', line 457 def reload_object(object, = {}) backend do |b| b.reload_object(object, ) end end |
#remove(index, specs) ⇒ Object #remove(specs) ⇒ Object Also known as: delete_doc, deindex
(Riak Search) Removes documents from a search index via the Solr interface.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/riak/client/search.rb', line 67 def remove(*args) index = args.shift if String === args.first raise ArgumentError.new(t("search_remove_requires_id_or_query")) unless args.all? { |s| s.include? :id or s.include? 'id' or s.include? :query or s.include? 'query' } xml = Builder::XmlMarkup.new xml.delete do args.each do |spec| spec.each do |k,v| xml.tag!(k.to_sym, v) end end end http do |h| h.update_search_index(index, xml.target!) end true end |
#search(index, query, options = {}) ⇒ Hash #search(query, options = {}) ⇒ Hash Also known as: select
(Riak Search) Performs a search via the Solr interface.
20 21 22 23 24 25 26 |
# File 'lib/riak/client/search.rb', line 20 def search(*args) = args. index, query = args[-2], args[-1] # Allows nil index, while keeping it as firstargument backend do |b| b.search(index, query, ) end end |
#set_bucket_props(bucket, properties) ⇒ Object
Sets the properties on a bucket. See Bucket#props=
464 465 466 467 468 469 470 |
# File 'lib/riak/client.rb', line 464 def set_bucket_props(bucket, properties) # A bug in Beefcake is still giving us trouble with default booleans. # Until it is resolved, we'll use the HTTP backend. http do |b| b.set_bucket_props(bucket, properties) end end |
#ssl=(value) ⇒ Object
Enables or disables SSL on all nodes, for HTTP backends.
473 474 475 476 477 478 |
# File 'lib/riak/client.rb', line 473 def ssl=(value) @nodes.each do |node| node.ssl = value end value end |
#stamp ⇒ Stamp
Exposes a Stamp object for use in generating unique identifiers.
484 485 486 |
# File 'lib/riak/client.rb', line 484 def stamp @stamp ||= Riak::Stamp.new(self) end |
#store_file(filename, content_type, data) ⇒ String #store_file(content_type, data) ⇒ String
Stores a large file/IO-like object in Riak via the “Luwak” interface.
499 500 501 502 503 |
# File 'lib/riak/client.rb', line 499 def store_file(*args) http do |h| h.store_file(*args) end end |
#store_object(object, options = {}) ⇒ Object
Stores an object in Riak.
506 507 508 509 510 511 |
# File 'lib/riak/client.rb', line 506 def store_object(object, = {}) params = {:returnbody => true}.merge() backend do |b| b.store_object(object, params) end end |