Class: Riak::Client::HTTPBackend

Inherits:
Object
  • Object
show all
Includes:
Configuration, ObjectMethods, TransportMethods, Util::Escape, Util::Translation
Defined in:
lib/riak/client/http_backend.rb,
lib/riak/client/http_backend/key_streamer.rb,
lib/riak/client/http_backend/configuration.rb,
lib/riak/client/http_backend/object_methods.rb,
lib/riak/client/http_backend/request_headers.rb,
lib/riak/client/http_backend/transport_methods.rb

Overview

The parent class for all backends that connect to Riak via HTTP. This class implements all of the universal backend API methods on behalf of subclasses, which need only implement the TransportMethods#perform method for library-specific semantics.

Direct Known Subclasses

ExconBackend, NetHTTPBackend

Defined Under Namespace

Modules: Configuration, ObjectMethods, TransportMethods Classes: KeyStreamer, RequestHeaders

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods included from Configuration

#bucket_list_path, #bucket_properties_path, #index_eq_path, #index_range_path, #key_list_path, #link_walk_path, #luwak_path, #mapred_path, #object_path, #ping_path, #solr_select_path, #solr_update_path, #stats_path

Methods included from ObjectMethods

#load_object, #reload_headers, #store_headers

Methods included from TransportMethods

#basic_auth_header, #client_id, #default_headers, #delete, #get, #head, #path, #perform, #post, #put, #return_body?, #root_uri, #valid_response?, #verify_body!

Methods included from Util::Translation

#i18n_scope, #t

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Constructor Details

- (HTTPBackend) initialize(client, node)

Create an HTTPBackend for the Riak::Client.

Parameters:

  • The (Client)

    client

  • The (Node)

    node we're connecting to.

Raises:

  • (ArgumentError)


38
39
40
41
42
43
# File 'lib/riak/client/http_backend.rb', line 38

def initialize(client, node)
  raise ArgumentError, t("client_type", :client => client) unless Client === client
  raise ArgumentError, t("node_type", :node => node) unless Node === node
  @client = client
  @node = node
end

Instance Attribute Details

- (Object) client (readonly)

The Riak::Client that uses this backend



30
31
32
# File 'lib/riak/client/http_backend.rb', line 30

def client
  @client
end

- (Object) node (readonly)

The Riak::Client::Node that uses this backend



33
34
35
# File 'lib/riak/client/http_backend.rb', line 33

def node
  @node
end

Instance Method Details

- (Object) delete_file(filename)

(Luwak) Deletes a file from the Luwak large-file interface.

Parameters:

  • filename (String)

    the name of the file



293
294
295
# File 'lib/riak/client/http_backend.rb', line 293

def delete_file(filename)
  delete([204,404], luwak_path(filename))
end

- (Object) delete_object(bucket, key, options = {})

Deletes an object

Parameters:

  • bucket (Bucket, String)

    the bucket where the object lives

  • key (String)

    the key where the object lives

  • options (Hash) (defaults to: {})

    quorum and delete options



109
110
111
112
113
114
# File 'lib/riak/client/http_backend.rb', line 109

def delete_object(bucket, key, options={})
  bucket = bucket.name if Bucket === bucket
  vclock = options.delete(:vclock)
  headers = vclock ? {"X-Riak-VClock" => vclock} : {}
  delete([204, 404], object_path(bucket, key, options), headers)
end

- (RObject) fetch_object(bucket, key, options = {})

Fetches an object by bucket/key

Parameters:

  • bucket (Bucket, String)

    the bucket where the object is stored

  • key (String)

    the key of the object

  • options (Hash) (defaults to: {})

    request quorums

Options Hash (options):

  • :r (Fixnum, String, Symbol)

    the read quorum for the request - how many nodes should concur on the read

  • :pr (Fixnum, String, Symbol)

    the "primary" read quorum for the request - how many primary partitions must be available

Returns:



65
66
67
68
69
# File 'lib/riak/client/http_backend.rb', line 65

def fetch_object(bucket, key, options={})
  bucket = Bucket.new(client, bucket) if String === bucket
  response = get([200,300], object_path(bucket.name, key, options))
  load_object(RObject.new(bucket, key), response)
end

- (true, false) file_exists?(filename)

(Luwak) Detects whether a file exists in the Luwak large-file interface.

Parameters:

  • filename (String)

    the name of the file

Returns:

  • (true, false)

    whether the file exists



286
287
288
289
# File 'lib/riak/client/http_backend.rb', line 286

def file_exists?(filename)
  result = head([200,404], luwak_path(filename))
  result[:code] == 200
end

- (Hash) get_bucket_props(bucket)

Fetches bucket properties

Parameters:

  • bucket (Bucket, String)

    the bucket properties to fetch

Returns:

  • (Hash)

    bucket properties



119
120
121
122
123
# File 'lib/riak/client/http_backend.rb', line 119

def get_bucket_props(bucket)
  bucket = bucket.name if Bucket === bucket
  response = get(200, bucket_properties_path(bucket))
  JSON.parse(response[:body])['props']
end

- (IO?) get_file(filename) {|chunk| ... }

(Luwak) Fetches a file from the Luwak large-file interface.

Parameters:

  • filename (String)

    the name of the file

Yields:

  • (chunk)

    A block which will receive individual chunks of the file as they are streamed

Yield Parameters:

  • chunk (String)

    a block of the file

Returns:

  • (IO, nil)

    the file (also having content_type and original_filename accessors). The file will need to be reopened to be read



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/riak/client/http_backend.rb', line 264

def get_file(filename, &block)
  if block_given?
    get(200, luwak_path(filename), &block)
    nil
  else
    tmpfile = LuwakFile.new(escape(filename))
    begin
      response = get(200, luwak_path(filename)) do |chunk|
        tmpfile.write chunk
      end
      tmpfile.content_type = response[:headers]['content-type'].first
      tmpfile
    ensure
      tmpfile.close
    end
  end
end

- (Array<String>) get_index(bucket, index, query)

Performs a secondary-index query.

Parameters:

  • bucket (String, Bucket)

    the bucket to query

  • index (String)

    the index to query

  • query (String, Integer, Range)

    the equality query or range query to perform

Returns:

  • (Array<String>)

    a list of keys matching the query



217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/riak/client/http_backend.rb', line 217

def get_index(bucket, index, query)
  bucket = bucket.name if Bucket === bucket
  path = case query
         when Range
           raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.end
           index_range_path(bucket, index, query.begin, query.end)
         when String, Integer
           index_eq_path(bucket, index, query)
         else
           raise ArgumentError, t('invalid_index_query', :value => query.inspect)
         end
  response = get(200, path)
  JSON.parse(response[:body])['keys']
end

Performs a link-walking query

Parameters:

  • robject (RObject)

    the object to start at

  • walk_specs (Array<WalkSpec>)

    a list of walk specifications to process

Returns:



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/riak/client/http_backend.rb', line 195

def link_walk(robject, walk_specs)
  response = get(200, link_walk_path(robject.bucket.name, robject.key, walk_specs))
  if boundary = Util::Multipart.extract_boundary(response[:headers]['content-type'].first)
    Util::Multipart.parse(response[:body], boundary).map do |group|
      group.map do |obj|
        if obj[:headers] && !obj[:headers]['x-riak-deleted'] && !obj[:body].blank? && obj[:headers]['location']
          link = Riak::Link.new(obj[:headers]['location'].first, "")
          load_object(RObject.new(client.bucket(link.bucket), link.key), obj)
        end
      end.compact
    end
  else
    []
  end
end

- (Array<String>) list_buckets

Lists known buckets

Returns:



153
154
155
156
# File 'lib/riak/client/http_backend.rb', line 153

def list_buckets
  response = get(200, bucket_list_path)
  JSON.parse(response[:body])['buckets']
end

- (Array<String>) list_keys(bucket) {|Array<String>| ... }

List keys in a bucket

Parameters:

  • bucket (Bucket, String)

    the bucket to fetch the keys for

Yields:

  • (Array<String>)

    a list of keys from the current streamed chunk

Returns:

  • (Array<String>)

    the list of keys, if no block was given



140
141
142
143
144
145
146
147
148
149
# File 'lib/riak/client/http_backend.rb', line 140

def list_keys(bucket, &block)
  bucket = bucket.name if Bucket === bucket
  if block_given?
    get(200, key_list_path(bucket, :keys => 'stream'), {}, &KeyStreamer.new(block))
  else
    response = get(200, key_list_path(bucket))
    obj = JSON.parse(response[:body])
    obj && obj['keys'].map {|k| unescape(k) }
  end
end

- (Array<Object>) mapred(mr) {|Fixnum, Object| ... }

Performs a MapReduce query.

Parameters:

Yields:

  • (Fixnum, Object)

    the phase number and single result from the phase

Returns:

  • (Array<Object>)

    the list of results, if no block was given



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/riak/client/http_backend.rb', line 164

def mapred(mr)
  if block_given?
    parser = Riak::Util::Multipart::StreamParser.new do |response|
      result = JSON.parse(response[:body])
      yield result['phase'], result['data']
    end
    post(200, mapred_path({:chunked => true}), mr.to_json, {"Content-Type" => "application/json", "Accept" => "application/json"}, &parser)
    nil
  else
    response = post(200, mapred_path, mr.to_json, {"Content-Type" => "application/json", "Accept" => "application/json"})
    begin
      JSON.parse(response[:body])
    rescue
      response
    end
  end
end

- (true, false) ping

Pings the server

Returns:

  • (true, false)

    whether the server is available



47
48
49
50
51
52
# File 'lib/riak/client/http_backend.rb', line 47

def ping
  get(200, ping_path)
  true
rescue
  false
end

- (Object) reload_object(robject, options = {})

Reloads the data for a given RObject, a special case of #fetch_object.



72
73
74
75
76
77
78
79
# File 'lib/riak/client/http_backend.rb', line 72

def reload_object(robject, options={})
  response = get([200,300,304], object_path(robject.bucket.name, robject.key, options), reload_headers(robject))
  if response[:code].to_i == 304
    robject
  else
    load_object(robject, response)
  end
end

- (Object) search(index, query, options = {})

(Riak Search) Performs a search query

Parameters:

  • index (String, nil)

    the index to query, or nil for the default

  • query (String)

    the Lucene query to perform

  • options (Hash) (defaults to: {})

    query options

See Also:



238
239
240
241
242
243
244
245
# File 'lib/riak/client/http_backend.rb', line 238

def search(index, query, options={})
  response = get(200, solr_select_path(index, query, options.stringify_keys))
  if response[:headers]['content-type'].include?("application/json")
    JSON.parse(response[:body])
  else
    response[:body]
  end
end

- (Object) set_bucket_props(bucket, props)

Sets bucket properties

Parameters:

  • bucket (Bucket, String)

    the bucket to set properties on

  • properties (Hash)

    the properties to set



128
129
130
131
132
# File 'lib/riak/client/http_backend.rb', line 128

def set_bucket_props(bucket, props)
  bucket = bucket.name if Bucket === bucket
  body = {'props' => props}.to_json
  put(204, bucket_properties_path(bucket), body, {"Content-Type" => "application/json"})
end

- (Hash) stats

Gets health statistics

Returns:

  • (Hash)

    information about the server, including stats



184
185
186
187
# File 'lib/riak/client/http_backend.rb', line 184

def stats
  response = get(200, stats_path)
  JSON.parse(response[:body])
end

- (String) store_file(filename, content_type, data) - (String) store_file(content_type, data)

(Luwak) Uploads a file to the Luwak large-file interface.

Overloads:

  • - (String) store_file(filename, content_type, data)

    Stores the file at the given key/filename

    Parameters:

    • filename (String)

      the key/filename for the object

    • content_type (String)

      the MIME Content-Type for the data

    • data (IO, String)

      the contents of the file

  • - (String) store_file(content_type, data)

    Stores the file with a server-determined key/filename

    Parameters:

    • content_type (String)

      the MIME Content-Type for the data

    • data (String, #read)

      the contents of the file

Returns:

  • (String)

    the key/filename where the object was stored



308
309
310
311
312
313
314
315
316
317
# File 'lib/riak/client/http_backend.rb', line 308

def store_file(*args)
  data, content_type, filename = args.reverse
  if filename
    put(204, luwak_path(filename), data, {"Content-Type" => content_type})
    filename
  else
    response = post(201, luwak_path(nil), data, {"Content-Type" => content_type})
    response[:headers]["location"].first.split("/").last
  end
end

- (Object) store_object(robject, options = {})

Stores an object

Parameters:

  • robject (RObject)

    the object to store

  • options (Hash) (defaults to: {})

    quorum and storage options

Options Hash (options):

  • :returnbody (true, false) — default: false

    whether to update the object after write with the new value

  • :w (Fixnum, String, Symbol)

    the write quorum

  • :pw (Fixnum, String, Symbol)

    the "primary" write quorum - how many primary partitions must be available

  • :dw (Fixnum, String, Symbol)

    the durable write quorum



90
91
92
93
94
95
96
97
98
# File 'lib/riak/client/http_backend.rb', line 90

def store_object(robject, options={})
  method, codes = if robject.key.present?
                    [:put, [200,204,300]]
                  else
                    [:post, 201]
                  end
  response = send(method, codes, object_path(robject.bucket.name, robject.key, options), robject.raw_data, store_headers(robject))
  load_object(robject, response) if options[:returnbody]
end

- (Object) update_search_index(index, updates)

(Riak Search) Updates a search index (includes deletes).

Parameters:

  • index (String, nil)

    the index to update, or nil for the default index.

  • updates (String)

    an XML update string in Solr's required format

See Also:



252
253
254
# File 'lib/riak/client/http_backend.rb', line 252

def update_search_index(index, updates)
  post(200, solr_update_path(index), updates, {'Content-Type' => 'text/xml'})
end