Class: MapReduce::Mapper

Inherits:
Object
  • Object
show all
Includes:
Mergeable, Reduceable, MonitorMixin
Defined in:
lib/map_reduce/mapper.rb

Overview

The MapReduce::Mapper class runs the mapping part of your map-reduce job.

Instance Method Summary collapse

Constructor Details

#initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 16 * 1024 * 1024) ⇒ Mapper

Initializes a new mapper.

Examples:

MapReduce::Mapper.new(MyImplementation.new, partitioner: HashPartitioner.new(16), memory_limit: 16.megabytes)

Parameters:

  • implementation

    Your map-reduce implementation, i.e. an object which responds to #map and #reduce.

  • partitioner (#call) (defaults to: HashPartitioner.new(32))

    A partitioner, i.e. an object which responds to #call and calculates a partition for the passed key.

  • memory_limit (#to_i) (defaults to: 16 * 1024 * 1024)

    The memory limit, i.e. the buffer size in bytes.



21
22
23
24
25
26
27
28
29
30
31
# File 'lib/map_reduce/mapper.rb', line 21

def initialize(implementation, partitioner: HashPartitioner.new(32), memory_limit: 16 * 1024 * 1024)
  super()

  @implementation = implementation
  @partitioner = partitioner
  @memory_limit = memory_limit.to_i

  @buffer_size = 0
  @buffer = []
  @chunks = []
end

Instance Method Details

#map(*args, **kwargs) ⇒ Object

Passes the received key to your map-reduce implementation and adds yielded key-value pair to a buffer. When the memory limit is reached, the chunk is sorted and written to a tempfile.

Examples:

mapper.map("some_key")
mapper.map("other_key")

Parameters:

  • key

    The key to pass to the map-reduce implementation.



43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/map_reduce/mapper.rb', line 43

def map(*args, **kwargs)
  @implementation.map(*args, **kwargs) do |new_key, new_value|
    synchronize do
      partition = @partitioner.call(new_key)
      item = [[partition, new_key], new_value]

      @buffer.push(item)
      @buffer_size += JSON.generate(item).bytesize

      write_chunk if @buffer_size >= @memory_limit
    end
  end
end

#shuffle(chunk_limit:) ⇒ Object

Performs a k-way-merge of the sorted chunks written to tempfiles while already reducing the result using your map-reduce implementation (if available) and splitting the dataset into partitions. Finally yields a hash of (partition, path) pairs containing the data for the partitions in tempfiles.

Examples:

mapper.shuffle do |partitions|
  partitions.each do |partition, path|
    # store data e.g. on s3
  end
end

Parameters:

  • chunk_limit (Integer)

    The maximum number of files to process at the same time. Most useful when you run on a system where the number of open file descriptors is limited. If your number of file descriptors is unlimited, you want to set it to a higher number to avoid the overhead of multiple runs.

Raises:



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/map_reduce/mapper.rb', line 76

def shuffle(chunk_limit:)
  raise(InvalidChunkLimit, "Chunk limit must be >= 2") unless chunk_limit >= 2

  begin
    write_chunk if @buffer_size > 0

    chunk = k_way_merge(@chunks, chunk_limit: chunk_limit)
    chunk = reduce_chunk(chunk, @implementation) if @implementation.respond_to?(:reduce)

    partitions = split_chunk(chunk)

    yield(partitions.transform_values(&:path))
  ensure
    partitions&.each_value(&:delete)

    @chunks.each(&:delete)
    @chunks = []
  end

  nil
end