Class: Riak::MapReduce
- Includes:
- Util::Escape, Util::Translation
- Defined in:
- lib/riak/map_reduce.rb,
lib/riak/map_reduce/phase.rb,
lib/riak/map_reduce/filter_builder.rb
Overview
Class for invoking map-reduce jobs using the HTTP interface.
Defined Under Namespace
Classes: FilterBuilder, Phase
Instance Attribute Summary collapse
-
#inputs ⇒ Array<[bucket,key]>, ...
The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.
-
#query ⇒ Array<Phase>
The map and reduce phases that will be executed.
Instance Method Summary collapse
-
#add(*params) ⇒ MapReduce
(also: #<<, #include)
Add or replace inputs for the job.
-
#filter(bucket) { ... } ⇒ MapReduce
Adds a bucket and key-filters built by the given block.
-
#index(bucket, index, query) ⇒ MapReduce
(Secondary Indexes) Use a secondary index query to start a map/reduce job.
-
#initialize(client) {|self| ... } ⇒ MapReduce
constructor
Creates a new map-reduce job.
-
#link(*params) ⇒ MapReduce
Add a link phase to the job.
-
#map(*params) ⇒ MapReduce
Add a map phase to the job.
-
#reduce(*params) ⇒ MapReduce
Add a reduce phase to the job.
-
#run(&block) ⇒ Object
Executes this map-reduce job.
-
#search(bucket, query) ⇒ MapReduce
(Riak Search) Use a search query to start a map/reduce job.
-
#timeout(value) ⇒ Object
(also: #timeout=)
Sets the timeout for the map-reduce job.
-
#to_json(*a) ⇒ String
Convert the job to JSON for submission over the HTTP interface.
Methods included from Util::Escape
#escape, #maybe_escape, #maybe_unescape, #unescape
Methods included from Util::Translation
Constructor Details
#initialize(client) {|self| ... } ⇒ MapReduce
Creates a new map-reduce job.
34 35 36 37 |
# File 'lib/riak/map_reduce.rb', line 34 def initialize(client) @client, @inputs, @query = client, [], [] yield self if block_given? end |
Instance Attribute Details
#inputs ⇒ Array<[bucket,key]>, ...
Returns The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.
23 24 25 |
# File 'lib/riak/map_reduce.rb', line 23 def inputs @inputs end |
Instance Method Details
#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce #add(bucket, filters) ⇒ MapReduce Also known as: <<, include
Add or replace inputs for the job.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/riak/map_reduce.rb', line 62 def add(*params) params = params.dup params = params.first if Array === params.first case params.size when 1 p = params.first case p when Bucket warn(t('full_bucket_mapred', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings @inputs = maybe_escape(p.name) when RObject @inputs << [maybe_escape(p.bucket.name), maybe_escape(p.key)] when String warn(t('full_bucket_mapred', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings @inputs = maybe_escape(p) end when 2..3 bucket = params.shift bucket = bucket.name if Bucket === bucket if Array === params.first warn(t('full_bucket_mapred', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings @inputs = {:bucket => maybe_escape(bucket), :key_filters => params.first } else key = params.shift @inputs << params.unshift(maybe_escape(key)).unshift(maybe_escape(bucket)) end end self end |
#filter(bucket) { ... } ⇒ MapReduce
Adds a bucket and key-filters built by the given block. Equivalent to #add with a list of filters.
100 101 102 |
# File 'lib/riak/map_reduce.rb', line 100 def filter(bucket, &block) add(bucket, FilterBuilder.new(&block).to_a) end |
#index(bucket, index, query) ⇒ MapReduce
(Secondary Indexes) Use a secondary index query to start a map/reduce job.
121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/riak/map_reduce.rb', line 121 def index(bucket, index, query) bucket = bucket.name if bucket.respond_to?(:name) case query when String, Fixnum @inputs = {:bucket => maybe_escape(bucket), :index => index, :key => query} when Range raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.begin @inputs = {:bucket => maybe_escape(bucket), :index => index, :start => query.begin, :end => query.end} else raise ArgumentError, t('invalid_index_query', :value => query.inspect) end self end |
#link(walk_spec, options = {}) ⇒ MapReduce #link(bucket, tag, keep, options = {}) ⇒ MapReduce #link(options) ⇒ MapReduce
Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).
177 178 179 180 181 182 183 |
# File 'lib/riak/map_reduce.rb', line 177 def link(*params) = params. = .slice!(:type, :function, :language, :arg) unless params.first walk_spec = WalkSpec.normalize(params.shift || ).first @query << Phase.new({:type => :link, :function => walk_spec}.merge()) self end |
#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce
Add a map phase to the job.
143 144 145 146 147 |
# File 'lib/riak/map_reduce.rb', line 143 def map(*params) = params. @query << Phase.new({:type => :map, :function => params.shift}.merge()) self end |
#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce
Add a reduce phase to the job.
157 158 159 160 161 |
# File 'lib/riak/map_reduce.rb', line 157 def reduce(*params) = params. @query << Phase.new({:type => :reduce, :function => params.shift}.merge()) self end |
#run ⇒ Array<Array> #run {|phase, data| ... } ⇒ nil
Executes this map-reduce job.
215 216 217 218 219 220 221 222 223 |
# File 'lib/riak/map_reduce.rb', line 215 def run(&block) @client.mapred(self, &block) rescue FailedRequest => fr if fr.server_error? && fr.is_json? raise MapReduceError.new(fr.body) else raise fr end end |
#search(bucket, query) ⇒ MapReduce
(Riak Search) Use a search query to start a map/reduce job.
108 109 110 111 112 |
# File 'lib/riak/map_reduce.rb', line 108 def search(bucket, query) bucket = bucket.name if bucket.respond_to?(:name) @inputs = {:module => "riak_search", :function => "mapred_search", :arg => [bucket, query]} self end |
#timeout(value) ⇒ Object Also known as: timeout=
Sets the timeout for the map-reduce job.
187 188 189 190 |
# File 'lib/riak/map_reduce.rb', line 187 def timeout(value) @timeout = value return self end |
#to_json(*a) ⇒ String
Convert the job to JSON for submission over the HTTP interface.
195 196 197 198 199 |
# File 'lib/riak/map_reduce.rb', line 195 def to_json(*a) hash = {"inputs" => inputs, "query" => query.map(&:as_json)} hash['timeout'] = @timeout.to_i if @timeout hash.to_json(*a) end |