Class: MapReduce::Reducer
- Inherits:
-
Object
- Object
- MapReduce::Reducer
- Includes:
- Mergeable, Reduceable, MonitorMixin
- Defined in:
- lib/map_reduce/reducer.rb
Overview
The MapReduce::Reducer class runs the reducer part of your map-reduce job.
Instance Method Summary collapse
-
#add_chunk ⇒ Object
Adds a chunk from the mapper-phase to the reducer by registering a tempfile and returning the path to that tempfile, such that you can download a chunk e.g.
-
#initialize(implementation) ⇒ Reducer
constructor
Initializes a new reducer.
-
#reduce(chunk_limit:, &block) ⇒ Object
Performs a k-way-merge of the added chunks and yields the reduced key-value pairs.
Constructor Details
#initialize(implementation) ⇒ Reducer
Initializes a new reducer.
17 18 19 20 21 22 |
# File 'lib/map_reduce/reducer.rb', line 17 def initialize(implementation) super() @implementation = implementation @temp_paths = [] end |
Instance Method Details
#add_chunk ⇒ Object
Adds a chunk from the mapper-phase to the reducer by registering a tempfile and returning the path to that tempfile, such that you can download a chunk e.g. from s3 and write the content to this tempfile.
34 35 36 37 38 39 40 41 42 |
# File 'lib/map_reduce/reducer.rb', line 34 def add_chunk temp_path = TempPath.new synchronize do @temp_paths.push(temp_path) end temp_path.path end |
#reduce(chunk_limit:, &block) ⇒ Object
Performs a k-way-merge of the added chunks and yields the reduced key-value pairs. It performs multiple runs when more than ‘chunk_limit` chunks exist. A run means: it takes up to `chunk_limit` chunks, reduces them and pushes the result as a new chunk. At the end it removes all tempfiles, even if errors occur.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/map_reduce/reducer.rb', line 69 def reduce(chunk_limit:, &block) return enum_for(__method__, chunk_limit: chunk_limit) unless block_given? raise(InvalidChunkLimit, "Chunk limit must be >= 2") unless chunk_limit >= 2 begin loop do slice = @temp_paths.shift(chunk_limit) if @temp_paths.empty? reduce_chunk(k_way_merge(slice, chunk_limit: chunk_limit), @implementation).each do |pair| block.call(pair) end return end File.open(add_chunk, "w+") do |file| reduce_chunk(k_way_merge(slice, chunk_limit: chunk_limit), @implementation).each do |pair| file.puts JSON.generate(pair) end end ensure slice&.each(&:delete) end ensure @temp_paths.each(&:delete) @temp_paths = [] end nil end |