Class: Riak::MapReduce
- Includes:
- Util::Escape, Util::Translation
- Defined in:
- lib/riak/map_reduce.rb,
lib/riak/map_reduce/phase.rb,
lib/riak/map_reduce/results.rb,
lib/riak/map_reduce/filter_builder.rb
Overview
Class for invoking map-reduce jobs using the HTTP interface.
Defined Under Namespace
Classes: FilterBuilder, Phase, Results
Instance Attribute Summary collapse
-
#inputs ⇒ Array<[bucket,key]>, ...
The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.
-
#query ⇒ Array<Phase>
The map and reduce phases that will be executed.
Instance Method Summary collapse
-
#add(*params) ⇒ MapReduce
(also: #<<, #include)
Add or replace inputs for the job.
-
#filter(bucket) { ... } ⇒ MapReduce
Adds a bucket and key-filters built by the given block.
-
#index(bucket, index, query) ⇒ MapReduce
(Secondary Indexes) Use a secondary index query to start a map/reduce job.
-
#initialize(client) {|self| ... } ⇒ MapReduce
constructor
Creates a new map-reduce job.
-
#link(*params) ⇒ MapReduce
Add a link phase to the job.
-
#map(*params) ⇒ MapReduce
Add a map phase to the job.
-
#reduce(*params) ⇒ MapReduce
Add a reduce phase to the job.
-
#run(&block) ⇒ Object
Executes this map-reduce job.
-
#search(index, query) ⇒ MapReduce
(Riak Search) Use a search query to start a map/reduce job.
-
#timeout(value) ⇒ Object
(also: #timeout=)
Sets the timeout for the map-reduce job.
-
#to_json(*a) ⇒ String
Convert the job to JSON for submission over the HTTP interface.
Methods included from Util::Escape
#escape, #maybe_escape, #maybe_unescape, #unescape
Methods included from Util::Translation
Constructor Details
#initialize(client) {|self| ... } ⇒ MapReduce
Creates a new map-reduce job.
36 37 38 39 |
# File 'lib/riak/map_reduce.rb', line 36 def initialize(client) @client, @inputs, @query = client, [], [] yield self if block_given? end |
Instance Attribute Details
#inputs ⇒ Array<[bucket,key]>, ...
Returns The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.
25 26 27 |
# File 'lib/riak/map_reduce.rb', line 25 def inputs @inputs end |
Instance Method Details
#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce #add(bucket, filters) ⇒ MapReduce Also known as: <<, include
Add or replace inputs for the job.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/riak/map_reduce.rb', line 64 def add(*params) params = params.dup params = params.first if Array === params.first case params.size when 1 p = params.first case p when Bucket @inputs = bucket_input(p) when RObject @inputs << robject_input(p) when String warn(t('full_bucket_mapred', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings @inputs = maybe_escape(p) end when 2..3 bucket = params.shift if Array === params.first if bucket.is_a? Bucket bucket = bucket_input(bucket) else bucket = maybe_escape(bucket) end warn(t('full_bucket_mapred', :backtrace => caller.join("\n "))) unless Riak.disable_list_keys_warnings @inputs = {:bucket => bucket, :key_filters => params.first } else key = params.shift key_data = params.shift || '' @inputs << key_input(key, bucket, key_data) end end self end |
#filter(bucket) { ... } ⇒ MapReduce
Adds a bucket and key-filters built by the given block. Equivalent to #add with a list of filters.
108 109 110 |
# File 'lib/riak/map_reduce.rb', line 108 def filter(bucket, &block) add(bucket, FilterBuilder.new(&block).to_a) end |
#index(bucket, index, query) ⇒ MapReduce
(Secondary Indexes) Use a secondary index query to start a map/reduce job.
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/riak/map_reduce.rb', line 130 def index(bucket, index, query) if bucket.is_a? Bucket bucket = bucket.needs_type? ? [maybe_escape(bucket.type.name), maybe_escape(bucket.name)] : maybe_escape(bucket.name) else bucket = maybe_escape(bucket) end case query when String, Fixnum @inputs = {:bucket => bucket, :index => index, :key => query} when Range raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.begin @inputs = {:bucket => bucket, :index => index, :start => query.begin, :end => query.end} else raise ArgumentError, t('invalid_index_query', :value => query.inspect) end self end |
#link(walk_spec, options = {}) ⇒ MapReduce #link(bucket, tag, keep, options = {}) ⇒ MapReduce #link(options) ⇒ MapReduce
Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).
191 192 193 194 195 196 197 |
# File 'lib/riak/map_reduce.rb', line 191 def link(*params) = params. = .slice!(:type, :function, :language, :arg) unless params.first walk_spec = WalkSpec.normalize(params.shift || ).first @query << Phase.new({:type => :link, :function => walk_spec}.merge()) self end |
#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce
Add a map phase to the job.
157 158 159 160 161 |
# File 'lib/riak/map_reduce.rb', line 157 def map(*params) = params. @query << Phase.new({:type => :map, :function => params.shift}.merge()) self end |
#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce
Add a reduce phase to the job.
171 172 173 174 175 |
# File 'lib/riak/map_reduce.rb', line 171 def reduce(*params) = params. @query << Phase.new({:type => :reduce, :function => params.shift}.merge()) self end |
#run ⇒ Array<Array> #run {|phase, data| ... } ⇒ nil
Executes this map-reduce job.
229 230 231 232 233 234 235 236 237 |
# File 'lib/riak/map_reduce.rb', line 229 def run(&block) @client.mapred(self, &block) rescue FailedRequest => fr if fr.server_error? && fr.is_json? raise MapReduceError.new(fr.body) else raise fr end end |
#search(index, query) ⇒ MapReduce
(Riak Search) Use a search query to start a map/reduce job.
117 118 119 120 121 |
# File 'lib/riak/map_reduce.rb', line 117 def search(index, query) index = index.name if index.respond_to?(:name) @inputs = {:module => "yokozuna", :function => "mapred_search", :arg => [index, query]} self end |
#timeout(value) ⇒ Object Also known as: timeout=
Sets the timeout for the map-reduce job.
201 202 203 204 |
# File 'lib/riak/map_reduce.rb', line 201 def timeout(value) @timeout = value return self end |
#to_json(*a) ⇒ String
Convert the job to JSON for submission over the HTTP interface.
209 210 211 212 213 |
# File 'lib/riak/map_reduce.rb', line 209 def to_json(*a) hash = {"inputs" => inputs, "query" => query.map(&:as_json)} hash['timeout'] = @timeout.to_i if @timeout hash.to_json(*a) end |