Class: MapReduce::Mapper
- Inherits:
-
Object
- Object
- MapReduce::Mapper
- Includes:
- Mergeable, Reduceable, MonitorMixin
- Defined in:
- lib/map_reduce/mapper.rb
Overview
The MapReduce::Mapper class runs the mapping part of your map-reduce job.
Instance Method Summary collapse
-
#initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 16 * 1024 * 1024) ⇒ Mapper
constructor
Initializes a new mapper.
-
#map(*args, **kwargs) ⇒ Object
Passes the received key to your map-reduce implementation and adds yielded key-value pair to a buffer.
-
#shuffle(chunk_limit:) ⇒ Object
Performs a k-way-merge of the sorted chunks written to tempfiles while already reducing the result using your map-reduce implementation (if available) and splitting the dataset into partitions.
Constructor Details
#initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 16 * 1024 * 1024) ⇒ Mapper
Initializes a new mapper.
21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/map_reduce/mapper.rb', line 21 def initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 16 * 1024 * 1024) super() @implementation = implementation @partitioner = partitioner @memory_limit = memory_limit.to_i @buffer_size = 0 @buffer = [] @chunks = [] end |
Instance Method Details
#map(*args, **kwargs) ⇒ Object
Passes the received key to your map-reduce implementation and adds yielded key-value pair to a buffer. When the memory limit is reached, the chunk is sorted and written to a tempfile.
43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/map_reduce/mapper.rb', line 43 def map(*args, **kwargs) @implementation.map(*args, **kwargs) do |new_key, new_value| synchronize do partition = @partitioner.call(new_key) item = [[partition, new_key], new_value] @buffer.push(item) @buffer_size += JSON.generate(item).bytesize write_chunk if @buffer_size >= @memory_limit end end end |
#shuffle(chunk_limit:) ⇒ Object
Performs a k-way-merge of the sorted chunks written to tempfiles while already reducing the result using your map-reduce implementation (if available) and splitting the dataset into partitions. Finally yields a hash of (partition, path) pairs containing the data for the partitions in tempfiles.
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/map_reduce/mapper.rb', line 76 def shuffle(chunk_limit:) raise(InvalidChunkLimit, "Chunk limit must be >= 2") unless chunk_limit >= 2 begin write_chunk if @buffer_size > 0 chunk = k_way_merge(@chunks, chunk_limit: chunk_limit) chunk = reduce_chunk(chunk, @implementation) if @implementation.respond_to?(:reduce) partitions = split_chunk(chunk) yield(partitions.transform_values(&:path)) ensure partitions&.each_value(&:delete) @chunks.each(&:delete) @chunks = [] end nil end |