Class: Riak::Client::HTTPBackend

Inherits:
Object
  • Object
show all
Includes:
FeatureDetection, Configuration, ObjectMethods, TransportMethods, Util::Escape, Util::Translation
Defined in:
lib/riak/client/http_backend.rb,
lib/riak/client/http_backend/key_streamer.rb,
lib/riak/client/http_backend/configuration.rb,
lib/riak/client/http_backend/object_methods.rb,
lib/riak/client/http_backend/request_headers.rb,
lib/riak/client/http_backend/transport_methods.rb

Overview

The parent class for all backends that connect to Riak via HTTP. This class implements all of the universal backend API methods on behalf of subclasses, which need only implement the TransportMethods#perform method for library-specific semantics.

Direct Known Subclasses

ExconBackend, NetHTTPBackend

Defined Under Namespace

Modules: Configuration, ObjectMethods, TransportMethods Classes: KeyStreamer, RequestHeaders

Constant Summary

Constants included from FeatureDetection

FeatureDetection::VERSION

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Configuration

#bucket_list_path, #bucket_properties_path, #index_eq_path, #index_range_path, #key_list_path, #link_walk_path, #luwak_path, #mapred_path, #object_path, #ping_path, #solr_select_path, #solr_update_path, #stats_path

Methods included from ObjectMethods

#load_object, #reload_headers, #store_headers

Methods included from TransportMethods

#basic_auth_header, #client_id, #default_headers, #delete, #get, #head, #path, #perform, #post, #put, #return_body?, #root_uri, #valid_response?, #verify_body!

Methods included from FeatureDetection

#get_server_version, #mapred_phaseless?, #pb_conditionals?, #pb_head?, #pb_indexes?, #pb_search?, #quorum_controls?, #server_version, #tombstone_vclocks?

Methods included from Util::Translation

#i18n_scope, #t

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Constructor Details

#initialize(client, node) ⇒ HTTPBackend

Create an HTTPBackend for the Riak::Client.

Parameters:

  • The (Client)

    client

  • The (Node)

    node we’re connecting to.

Raises:

  • (ArgumentError)


40
41
42
43
44
45
# File 'lib/riak/client/http_backend.rb', line 40

def initialize(client, node)
  raise ArgumentError, t("client_type", :client => client) unless Client === client
  raise ArgumentError, t("node_type", :node => node) unless Node === node
  @client = client
  @node = node
end

Instance Attribute Details

#clientObject (readonly)

The Riak::Client that uses this backend



32
33
34
# File 'lib/riak/client/http_backend.rb', line 32

def client
  @client
end

#nodeObject (readonly)

The Riak::Client::Node that uses this backend



35
36
37
# File 'lib/riak/client/http_backend.rb', line 35

def node
  @node
end

Instance Method Details

#delete_file(filename) ⇒ Object

(Luwak) Deletes a file from the Luwak large-file interface.

Parameters:

  • filename (String)

    the name of the file



296
297
298
# File 'lib/riak/client/http_backend.rb', line 296

def delete_file(filename)
  delete([204,404], luwak_path(filename))
end

#delete_object(bucket, key, options = {}) ⇒ Object

Deletes an object

Parameters:

  • bucket (Bucket, String)

    the bucket where the object lives

  • key (String)

    the key where the object lives

  • options (Hash) (defaults to: {})

    quorum and delete options



111
112
113
114
115
116
# File 'lib/riak/client/http_backend.rb', line 111

def delete_object(bucket, key, options={})
  bucket = bucket.name if Bucket === bucket
  vclock = options.delete(:vclock)
  headers = vclock ? {"X-Riak-VClock" => vclock} : {}
  delete([204, 404], object_path(bucket, key, options), headers)
end

#fetch_object(bucket, key, options = {}) ⇒ RObject

Fetches an object by bucket/key

Parameters:

  • bucket (Bucket, String)

    the bucket where the object is stored

  • key (String)

    the key of the object

  • options (Hash) (defaults to: {})

    request quorums

Options Hash (options):

  • :r (Fixnum, String, Symbol)

    the read quorum for the request - how many nodes should concur on the read

  • :pr (Fixnum, String, Symbol)

    the “primary” read quorum for the request - how many primary partitions must be available

Returns:



67
68
69
70
71
# File 'lib/riak/client/http_backend.rb', line 67

def fetch_object(bucket, key, options={})
  bucket = Bucket.new(client, bucket) if String === bucket
  response = get([200,300], object_path(bucket.name, key, options))
  load_object(RObject.new(bucket, key), response)
end

#file_exists?(filename) ⇒ true, false

(Luwak) Detects whether a file exists in the Luwak large-file interface.

Parameters:

  • filename (String)

    the name of the file

Returns:

  • (true, false)

    whether the file exists



289
290
291
292
# File 'lib/riak/client/http_backend.rb', line 289

def file_exists?(filename)
  result = head([200,404], luwak_path(filename))
  result[:code] == 200
end

#get_bucket_props(bucket) ⇒ Hash

Fetches bucket properties

Parameters:

  • bucket (Bucket, String)

    the bucket properties to fetch

Returns:

  • (Hash)

    bucket properties



121
122
123
124
125
# File 'lib/riak/client/http_backend.rb', line 121

def get_bucket_props(bucket)
  bucket = bucket.name if Bucket === bucket
  response = get(200, bucket_properties_path(bucket))
  JSON.parse(response[:body])['props']
end

#get_file(filename) {|chunk| ... } ⇒ IO?

(Luwak) Fetches a file from the Luwak large-file interface.

Parameters:

  • filename (String)

    the name of the file

Yields:

  • (chunk)

    A block which will receive individual chunks of the file as they are streamed

Yield Parameters:

  • chunk (String)

    a block of the file

Returns:

  • (IO, nil)

    the file (also having content_type and original_filename accessors). The file will need to be reopened to be read



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/riak/client/http_backend.rb', line 267

def get_file(filename, &block)
  if block_given?
    get(200, luwak_path(filename), &block)
    nil
  else
    tmpfile = LuwakFile.new(escape(filename))
    begin
      response = get(200, luwak_path(filename)) do |chunk|
        tmpfile.write chunk
      end
      tmpfile.content_type = response[:headers]['content-type'].first
      tmpfile
    ensure
      tmpfile.close
    end
  end
end

#get_index(bucket, index, query) ⇒ Array<String>

Performs a secondary-index query.

Parameters:

  • bucket (String, Bucket)

    the bucket to query

  • index (String)

    the index to query

  • query (String, Integer, Range)

    the equality query or range query to perform

Returns:

  • (Array<String>)

    a list of keys matching the query



220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/riak/client/http_backend.rb', line 220

def get_index(bucket, index, query)
  bucket = bucket.name if Bucket === bucket
  path = case query
         when Range
           raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.end
           index_range_path(bucket, index, query.begin, query.end)
         when String, Integer
           index_eq_path(bucket, index, query)
         else
           raise ArgumentError, t('invalid_index_query', :value => query.inspect)
         end
  response = get(200, path)
  JSON.parse(response[:body])['keys']
end

Performs a link-walking query

Parameters:

  • robject (RObject)

    the object to start at

  • walk_specs (Array<WalkSpec>)

    a list of walk specifications to process

Returns:



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/riak/client/http_backend.rb', line 198

def link_walk(robject, walk_specs)
  response = get(200, link_walk_path(robject.bucket.name, robject.key, walk_specs))
  if boundary = Util::Multipart.extract_boundary(response[:headers]['content-type'].first)
    Util::Multipart.parse(response[:body], boundary).map do |group|
      group.map do |obj|
        if obj[:headers] && !obj[:headers]['x-riak-deleted'] && !obj[:body].blank? && obj[:headers]['location']
          link = Riak::Link.new(obj[:headers]['location'].first, "")
          load_object(RObject.new(client.bucket(link.bucket), link.key), obj)
        end
      end.compact
    end
  else
    []
  end
end

#list_bucketsArray<String>

Lists known buckets

Returns:



155
156
157
158
# File 'lib/riak/client/http_backend.rb', line 155

def list_buckets
  response = get(200, bucket_list_path)
  JSON.parse(response[:body])['buckets']
end

#list_keys(bucket) {|Array<String>| ... } ⇒ Array<String>

List keys in a bucket

Parameters:

  • bucket (Bucket, String)

    the bucket to fetch the keys for

Yields:

  • (Array<String>)

    a list of keys from the current streamed chunk

Returns:

  • (Array<String>)

    the list of keys, if no block was given



142
143
144
145
146
147
148
149
150
151
# File 'lib/riak/client/http_backend.rb', line 142

def list_keys(bucket, &block)
  bucket = bucket.name if Bucket === bucket
  if block_given?
    get(200, key_list_path(bucket, :keys => 'stream'), {}, &KeyStreamer.new(block))
  else
    response = get(200, key_list_path(bucket))
    obj = JSON.parse(response[:body])
    obj && obj['keys'].map {|k| unescape(k) }
  end
end

#mapred(mr) {|Fixnum, Object| ... } ⇒ Array<Object>

Performs a MapReduce query.

Parameters:

Yields:

  • (Fixnum, Object)

    the phase number and single result from the phase

Returns:

  • (Array<Object>)

    the list of results, if no block was given

Raises:



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/riak/client/http_backend.rb', line 166

def mapred(mr)
  raise MapReduceError.new(t("empty_map_reduce_query")) if mr.query.empty? && !mapred_phaseless?
  if block_given?
    parser = Riak::Util::Multipart::StreamParser.new do |response|
      result = JSON.parse(response[:body])
      yield result['phase'], result['data']
    end
    post(200, mapred_path({:chunked => true}), mr.to_json, {"Content-Type" => "application/json", "Accept" => "application/json"}, &parser)
    nil
  else
    response = post(200, mapred_path, mr.to_json, {"Content-Type" => "application/json", "Accept" => "application/json"})
    begin
      JSON.parse(response[:body])
    rescue
      response
    end
  end
end

#pingtrue, false

Pings the server

Returns:

  • (true, false)

    whether the server is available



49
50
51
52
53
54
# File 'lib/riak/client/http_backend.rb', line 49

def ping
  get(200, ping_path)
  true
rescue
  false
end

#reload_object(robject, options = {}) ⇒ Object

Reloads the data for a given RObject, a special case of #fetch_object.



74
75
76
77
78
79
80
81
# File 'lib/riak/client/http_backend.rb', line 74

def reload_object(robject, options={})
  response = get([200,300,304], object_path(robject.bucket.name, robject.key, options), reload_headers(robject))
  if response[:code].to_i == 304
    robject
  else
    load_object(robject, response)
  end
end

#search(index, query, options = {}) ⇒ Object

(Riak Search) Performs a search query

Parameters:

  • index (String, nil)

    the index to query, or nil for the default

  • query (String)

    the Lucene query to perform

  • options (Hash) (defaults to: {})

    query options

See Also:



241
242
243
244
245
246
247
248
# File 'lib/riak/client/http_backend.rb', line 241

def search(index, query, options={})
  response = get(200, solr_select_path(index, query, options.stringify_keys))
  if response[:headers]['content-type'].include?("application/json")
    normalize_search_response JSON.parse(response[:body])
  else
    response[:body]
  end
end

#set_bucket_props(bucket, props) ⇒ Object

Sets bucket properties

Parameters:

  • bucket (Bucket, String)

    the bucket to set properties on

  • properties (Hash)

    the properties to set



130
131
132
133
134
# File 'lib/riak/client/http_backend.rb', line 130

def set_bucket_props(bucket, props)
  bucket = bucket.name if Bucket === bucket
  body = {'props' => props}.to_json
  put(204, bucket_properties_path(bucket), body, {"Content-Type" => "application/json"})
end

#statsHash

Gets health statistics

Returns:

  • (Hash)

    information about the server, including stats



187
188
189
190
# File 'lib/riak/client/http_backend.rb', line 187

def stats
  response = get(200, stats_path)
  JSON.parse(response[:body])
end

#store_file(filename, content_type, data) ⇒ String #store_file(content_type, data) ⇒ String

(Luwak) Uploads a file to the Luwak large-file interface.

Overloads:

  • #store_file(filename, content_type, data) ⇒ String

    Stores the file at the given key/filename

    Parameters:

    • filename (String)

      the key/filename for the object

    • content_type (String)

      the MIME Content-Type for the data

    • data (IO, String)

      the contents of the file

  • #store_file(content_type, data) ⇒ String

    Stores the file with a server-determined key/filename

    Parameters:

    • content_type (String)

      the MIME Content-Type for the data

    • data (String, #read)

      the contents of the file

Returns:

  • (String)

    the key/filename where the object was stored



311
312
313
314
315
316
317
318
319
320
# File 'lib/riak/client/http_backend.rb', line 311

def store_file(*args)
  data, content_type, filename = args.reverse
  if filename
    put(204, luwak_path(filename), data, {"Content-Type" => content_type})
    filename
  else
    response = post(201, luwak_path(nil), data, {"Content-Type" => content_type})
    response[:headers]["location"].first.split("/").last
  end
end

#store_object(robject, options = {}) ⇒ Object

Stores an object

Parameters:

  • robject (RObject)

    the object to store

  • options (Hash) (defaults to: {})

    quorum and storage options

Options Hash (options):

  • :returnbody (true, false) — default: false

    whether to update the object after write with the new value

  • :w (Fixnum, String, Symbol)

    the write quorum

  • :pw (Fixnum, String, Symbol)

    the “primary” write quorum - how many primary partitions must be available

  • :dw (Fixnum, String, Symbol)

    the durable write quorum



92
93
94
95
96
97
98
99
100
# File 'lib/riak/client/http_backend.rb', line 92

def store_object(robject, options={})
  method, codes = if robject.key.present?
                    [:put, [200,204,300]]
                  else
                    [:post, 201]
                  end
  response = send(method, codes, object_path(robject.bucket.name, robject.key, options), robject.raw_data, store_headers(robject))
  load_object(robject, response) if options[:returnbody]
end

#update_search_index(index, updates) ⇒ Object

(Riak Search) Updates a search index (includes deletes).

Parameters:

  • index (String, nil)

    the index to update, or nil for the default index.

  • updates (String)

    an XML update string in Solr’s required format

See Also:



255
256
257
# File 'lib/riak/client/http_backend.rb', line 255

def update_search_index(index, updates)
  post(200, solr_update_path(index), updates, {'Content-Type' => 'text/xml'})
end