Class: MapRedus::Reducer
- Inherits:
-
QueueProcess
- Object
- QueueProcess
- MapRedus::Reducer
- Defined in:
- lib/mapredus/reducer.rb
Overview
Reduce is a function that takes in “all” the values for a single given key and outputs a list of values or a single value that usually “reduces” the initial given value set.
The output of the reduce shall always be
reduce(values) = [ reduced value, reduced value, ... ]
and it will often only be a single element array
The input values and the output values of the reduce will always be a string. As described in the paper, it is up to the client to define how to deal with this restriction.
Constant Summary collapse
- DEFAULT_WAIT =
After a recoverable fail this describes how much time we shall wait before readding the reducer back on to the queue.
10
Class Method Summary collapse
-
.perform(pid, key) ⇒ Object
Doesn’t handle redundant workers and fault tolerance.
- .reduce(values) ⇒ Object
-
.reduce_perform(process, key) ⇒ Object
The overridable portion of a reducer perform.
-
.wait ⇒ Object
seconds.
Methods inherited from QueueProcess
Class Method Details
.perform(pid, key) ⇒ Object
Doesn’t handle redundant workers and fault tolerance
TODO: Resque::AutoRetry might mess this up.
39 40 41 42 43 44 45 46 47 |
# File 'lib/mapredus/reducer.rb', line 39 def self.perform(pid, key) process = Process.open(pid) reduce_perform(process, key) rescue MapRedus::RecoverableFail Master.enslave_later_reduce(process, key) ensure Master.free_slave(pid) process.next_state end |
.reduce(values) ⇒ Object
22 |
# File 'lib/mapredus/reducer.rb', line 22 def self.reduce(values); raise InvalidReducer; end |
.reduce_perform(process, key) ⇒ Object
The overridable portion of a reducer perform. In some default classes like Identity and Counter we do not call self.reduce but provide optimization for the reduction by overriding this method.
30 31 32 33 34 |
# File 'lib/mapredus/reducer.rb', line 30 def self.reduce_perform(process, key) reduce(process.map_values(key)) do |reduce_val| process.emit( key, reduce_val ) end end |
.wait ⇒ Object
seconds
20 |
# File 'lib/mapredus/reducer.rb', line 20 def self.wait; DEFAULT_WAIT; end |