Class: Riakpb::MapReduce
- Inherits:
-
Object
- Object
- Riakpb::MapReduce
- Includes:
- Util::Translation
- Defined in:
- lib/riakpb/map_reduce.rb
Overview
Class for invoking map-reduce jobs using the HTTP interface.
Defined Under Namespace
Classes: Phase
Instance Attribute Summary collapse
-
#inputs ⇒ Array<[bucket,key]>, String
The bucket/keys for input to the job, or the bucket (all keys).
-
#query ⇒ Array<Phase>
The map and reduce phases that will be executed.
Instance Method Summary collapse
-
#add(*params) ⇒ MapReduce
(also: #<<, #include)
Add or replace inputs for the job.
-
#initialize(client) {|self| ... } ⇒ MapReduce
constructor
Creates a new map-reduce job.
-
#link(params = {}) ⇒ MapReduce
(also: #walk)
Add a link phase to the job.
-
#map(*params) ⇒ MapReduce
Add a map phase to the job.
-
#reduce(*params) ⇒ MapReduce
Add a reduce phase to the job.
-
#run ⇒ Array<Array>
Executes this map-reduce job.
-
#timeout(value) ⇒ Object
Sets the timeout for the map-reduce job.
-
#to_json(options = {}) ⇒ String
Convert the job to JSON for submission over the HTTP interface.
Methods included from Util::Translation
Constructor Details
#initialize(client) {|self| ... } ⇒ MapReduce
Creates a new map-reduce job.
20 21 22 23 |
# File 'lib/riakpb/map_reduce.rb', line 20 def initialize(client) @client, @inputs, @query = client, [], [] yield self if block_given? end |
Instance Attribute Details
#inputs ⇒ Array<[bucket,key]>, String
Returns The bucket/keys for input to the job, or the bucket (all keys).
9 10 11 |
# File 'lib/riakpb/map_reduce.rb', line 9 def inputs @inputs end |
#query ⇒ Array<Phase>
Returns The map and reduce phases that will be executed.
15 16 17 |
# File 'lib/riakpb/map_reduce.rb', line 15 def query @query end |
Instance Method Details
#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce Also known as: <<, include
Add or replace inputs for the job.
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/riakpb/map_reduce.rb', line 41 def add(*params) params = params.dup.flatten case params.size when 1 p = params.first case p when Riakpb::Bucket @inputs = p.name when Riakpb::Key @inputs << p.to_input when String @inputs = p end when 2..3 bucket = params.shift bucket = bucket.name if Riakpb::Bucket === bucket @inputs << params.unshift(bucket) end self end |
#link(params = {}) ⇒ MapReduce Also known as: walk
Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).
95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/riakpb/map_reduce.rb', line 95 def link(params={}) bucket ||= params[:bucket] tag ||= params[:tag] keep = params[:keep] || false function = {} function[:bucket] = bucket unless bucket.nil? function[:tag] = tag unless tag.nil? @query << Phase.new({:type => :link, :function => function, :keep => keep}) return(self) end |
#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce
Add a map phase to the job.
72 73 74 75 76 |
# File 'lib/riakpb/map_reduce.rb', line 72 def map(*params) = params. @query << Phase.new({:type => :map, :function => params.shift}.merge()) self end |
#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce
Add a reduce phase to the job.
86 87 88 89 90 |
# File 'lib/riakpb/map_reduce.rb', line 86 def reduce(*params) = params. @query << Phase.new({:type => :reduce, :function => params.shift}.merge()) self end |
#run ⇒ Array<Array>
Executes this map-reduce job.
126 127 128 129 |
# File 'lib/riakpb/map_reduce.rb', line 126 def run response = @client.map_reduce_request(to_json, "application/json") # ActiveSupport::JSON.decode(response[:body]) end |
#timeout(value) ⇒ Object
Sets the timeout for the map-reduce job.
112 113 114 |
# File 'lib/riakpb/map_reduce.rb', line 112 def timeout(value) @timeout = value end |
#to_json(options = {}) ⇒ String
Convert the job to JSON for submission over the HTTP interface.
118 119 120 121 122 |
# File 'lib/riakpb/map_reduce.rb', line 118 def to_json(={}) hash = {"inputs" => inputs, "query" => query.map(&:as_json)} hash['timeout'] = @timeout.to_i if @timeout ActiveSupport::JSON.encode(hash, ) end |