Class: Riakpb::MapReduce

Inherits:
Object
  • Object
show all
Includes:
Util::Translation
Defined in:
lib/riakpb/map_reduce.rb

Overview

Class for invoking map-reduce jobs using the HTTP interface.

Defined Under Namespace

Classes: Phase

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client) {|self| ... } ⇒ MapReduce

Creates a new map-reduce job.

Parameters:

  • client (Client)

    the Riakpb::Client interface

Yields:

  • (self)

    helpful for initializing the job



20
21
22
23
# File 'lib/riakpb/map_reduce.rb', line 20

def initialize(client)
  @client, @inputs, @query = client, [], []
  yield self if block_given?
end

Instance Attribute Details

#inputsArray<[bucket,key]>, String

Returns The bucket/keys for input to the job, or the bucket (all keys).

Returns:

  • (Array<[bucket,key]>, String)

    The bucket/keys for input to the job, or the bucket (all keys).

See Also:



9
10
11
# File 'lib/riakpb/map_reduce.rb', line 9

def inputs
  @inputs
end

#queryArray<Phase>

Returns The map and reduce phases that will be executed.

Returns:

  • (Array<Phase>)

    The map and reduce phases that will be executed

See Also:



15
16
17
# File 'lib/riakpb/map_reduce.rb', line 15

def query
  @query
end

Instance Method Details

#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce Also known as: <<, include

Add or replace inputs for the job.

Overloads:

  • #add(bucket) ⇒ MapReduce

    Run the job across all keys in the bucket. This will replace any other inputs previously added.

    Parameters:

    • bucket (String, Bucket)

      the bucket to run the job on

  • #add(bucket, key) ⇒ MapReduce

    Add a bucket/key pair to the job.

    Parameters:

    • bucket (String, Bucket)

      the bucket of the object

    • key (String)

      the key of the object

  • #add(object) ⇒ MapReduce

    Add an object to the job (by its bucket/key)

    Parameters:

    • object (Key)

      the object to add to the inputs

  • #add(bucket, key, keydata) ⇒ MapReduce

    Parameters:

    • bucket (String, Bucket)

      the bucket of the object

    • key (String)

      the key of the object

    • keydata (String)

      extra data to pass along with the object to the job

Returns:



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/riakpb/map_reduce.rb', line 41

def add(*params)
  params = params.dup.flatten
  case params.size
  when 1
    p = params.first
    case p
    when Riakpb::Bucket
      @inputs = p.name
    when Riakpb::Key
      @inputs << p.to_input
    when String
      @inputs = p
    end
  when 2..3
    bucket = params.shift
    bucket = bucket.name if Riakpb::Bucket === bucket
    @inputs << params.unshift(bucket)
  end
  self
end

Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).

Parameters:

  • params (Hash) (defaults to: {})

    represents the types of links to follow

Returns:



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/riakpb/map_reduce.rb', line 95

def link(params={})
  bucket          ||= params[:bucket]
  tag             ||= params[:tag]
  keep              = params[:keep] || false
  function          = {}

  function[:bucket] = bucket  unless bucket.nil?
  function[:tag]    = tag     unless tag.nil?

  @query << Phase.new({:type => :link, :function => function, :keep => keep})

  return(self)
end

#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce

Add a map phase to the job.

Overloads:

  • #map(function) ⇒ MapReduce

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module,function] pair

  • #map(function?, options) ⇒ MapReduce

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module, function] pair

    • options (Hash)

      extra options for the phase (see Riakpb::MapReduce::Phase#initialize)

Returns:

See Also:



72
73
74
75
76
# File 'lib/riakpb/map_reduce.rb', line 72

def map(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :map, :function => params.shift}.merge(options))
  self
end

#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce

Add a reduce phase to the job.

Overloads:

  • #reduce(function) ⇒ MapReduce

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module,function] pair

  • #reduce(function?, options) ⇒ MapReduce

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module, function] pair

    • options (Hash)

      extra options for the phase (see Riakpb::MapReduce::Phase#initialize)

Returns:

See Also:



86
87
88
89
90
# File 'lib/riakpb/map_reduce.rb', line 86

def reduce(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :reduce, :function => params.shift}.merge(options))
  self
end

#runArray<Array>

Executes this map-reduce job.

Returns:

  • (Array<Array>)

    similar to link-walking, each element is an array of results from a phase where “keep” is true. If there is only one “keep” phase, only the results from that phase will be returned.



126
127
128
129
# File 'lib/riakpb/map_reduce.rb', line 126

def run
  response = @client.map_reduce_request(to_json, "application/json")
#      ActiveSupport::JSON.decode(response[:body])
end

#timeout(value) ⇒ Object

Sets the timeout for the map-reduce job.

Parameters:

  • value (Fixnum)

    the job timeout, in milliseconds



112
113
114
# File 'lib/riakpb/map_reduce.rb', line 112

def timeout(value)
  @timeout = value
end

#to_json(options = {}) ⇒ String

Convert the job to JSON for submission over the HTTP interface.

Returns:

  • (String)

    the JSON representation



118
119
120
121
122
# File 'lib/riakpb/map_reduce.rb', line 118

def to_json(options={})
  hash = {"inputs" => inputs, "query" => query.map(&:as_json)}
  hash['timeout'] = @timeout.to_i if @timeout
  ActiveSupport::JSON.encode(hash, options)
end