MapRedus

Simple MapReduce type framework using redis and resque.

Overview

This is an experimental implementation of MapReduce using Ruby for process definition, Resque for work execution, and Redis for data storage.

Goals:

  • simple M/R-style programming for existing Ruby projects
  • low cost of entry (no need for a dedicated cluster)

if you are looking for a high-performance MapReduce implementation that can meet your big data needs, try Hadoop.

Using MapRedus

MapRedus uses Resque to handle the processes that it runs, and redis to keep a store for the values/data produced.

Workers for a MapRedus process are Resque workers. Refer to the Resque worker documentation to see how to load the necessary environment for your worker to be able to run mapreduce processs. An example is also located in the tests.

Attaching a mapreduce process to a class

You will often want to define a mapreduce process that does some operations on data within a class. The process should have an inputter, mapper, reducer, finalizer, and outputter defined. By default a process will have the specifications shown below. There is also an example of how to do this in the tests.

class GetWordCount < MapRedus::Process
  inputter MapRedus::WordStream
  mapper MapRedus::WordCounter
  reducer MapRedus::Adder
  finalizer MapRedus::ToRedisHash
  outputter MapRedus::RedisHasher
  ordered false
end

class GetCharCount < MapRedus::Process
  inputter MapRedus::CharStream
  mapper MapRedus::CharCounter
end

class Job
  mapreduce_process :word_count, GetWordCount, "job:store:result"
end

The mapreduce_process needs a name, mapper, reducer, finalizer, outputter, and key to store the result. The operation would then be run on a job calling the following.

job = Job.new
job.mapreduce.word_count( data )

The data specifies the data on which this operation is to run. We are currently working on a way to allow the result_store_key to change depending on class properties. For instance in the above example, if the Job class had an id attribute, we may want to store the final mapreduce result in "job:store:result:#id".

Inputters, Mappers, Reducers, Finalizers

MapRedus needs a input stream, mapper, reducer, finalizer to be defined to run. The input stream defines how a block of your data gets divided so that a mapper can work on a small portion to map. For example:

class InputStream < MapRedus::InputStream
  def self.scan(data_object)
    # your data object is a reference to a block of text in redis
    text_block = MapRedus.redis.get(data_object)
    text_block.each_line.each_with_index do |line, i|
      yield(i, line)
    end
  end
end

class Mapper < MapRedus::Mapper
  def self.map(data_to_map)
    data_to_map.each do |data|
      key = data
      value = 1
      yield( key, value )
    end
  end
end

In this example, the input stream calls yield to output a mapredus file number and a the value that is saved to file (in redis). The mapper's map function calls yield to emit the key value pair for storage in redis. The reducer's reduce function acts similarly.

The finalizer runs whatever needs to be run when a process completes, an example:

class Finalizer < MapRedus::Finalizer
  def self.finalize(process)
    process.each_key_reduced_value do |key, value|
      process.outputter.encode(process.result_key, key, value)
    end
    ...
    < set off a new mapredus process to use this stored data >
  end
end

The process.result_key refers the final result key that is stored in redis. The result_key may take arguments which define the output of the key. The process will also incorporate initially given key arguments into the result_key. result_key's are defined exactly as a redis_key in the redis_support gem. The outputter is needed to define how exactly that encoding is defined. We provided an outputter that encodes your data into a redis hash.

class RedisHasher < MapRedus::Outputter
  def encode(result_key, k, v)
    MapRedus::FileSystem.hset(result_key, k, v)
  end

  def decode(result_key, k)
    MapRedus::FileSystem.hget(result_key, k)
  end
end

The default Outputter makes no changes to original result, and tries to store that directly into redis as a string.

Working Locally

MapRedus uses Bundler to manage dependencies. With Bundler installed:

bundle install

You should now be able to run tests and do all other tasks with rake.

Running Tests

Run the tests which tests the word counter example and some other tests (you'll need to have bundler installed) rake

Requirements

  • Bundler (this will install all the requirements below)
  • Redis
  • RedisSupport
  • Resque
  • Resque-scheduler

Notes

Instead of calling `emit_intermediate`/`emit` in your map/reduce
to produce a key value pair/value you call `yield`, which will call
emit_intermediate/emit for you.  This gives flexibility in using
Mapper/Reducer classes especially in testing.

TODO

not necessarily in the given order

  • Ensure that the type that is inputted is the type that is outputted

  • if a process fails we do what we are supposed to do i.e. add a failure_hook which does something if your process fails

  • include functionality for a partitioner, input reader, combiner

  • implement this shit (registering of environment shit in resque) so that we can run mapreduce commands from the command line. Defining any arbitrary mapper and reducer.

  • implement redundant workers (workers doing the same work in case one of them fails)

  • if a reducer runs a recoverable fail, then make sure that an attempt to reenslave the worker is delayed by some fixed interval

  • edit emit for when we have multiple workers doing the same reduce (redundant workers for fault tolerance might need to change the rpush to a lock and setting of just a value) even if other workers do work on the same answer, want to make sure that the final reduced thing is the same every time

  • Add fault tolerance, better tracking of which workers fail, especially when we have multiple workers doing the same work ... currently is handled by Resque failure auto retry

  • if a perform operation fails then we need to have worker recover

  • make use of finish_metrics somewhere so that we can have statistics on how long map reduce processs take

  • better tracking of work being assigned so we can know when a process is finished or in progress and have a trigger to do things when shit finishes

    in resque there is functionality for an after hook which performs something after your process does it's work

    might also check out the resque-status plugin for a cheap and easy way to plug status and completion-rate into existing resque jobs.

  • ensure reducers only do a fixed amount of work? See section 3.2 of paper. bookkeeping that tells the master when tasks are in-progress or completed. this will be important for better paralleziation of tasks

  • think about the following logic

    • if a reducer starts working on a key after all maps have finished then when it is done the work on that key is finished forerver
    • this would imply a process finishes when all map tasks have finished and all reduce tasks that start after the map tasks have finished
    • if a reducer started before all map tasks were finished, then load its reduced result back onto the value list
    • if the reducer started after all map tasks finished, then emit the result

Note on Patches/Pull Requests

  • Fork the project.
  • Make your feature addition or bug fix.
  • Add tests for it. This is important so I don't break it in a future version unintentionally.
  • Commit, do not mess with rakefile, version, or history. (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull)
  • Send me a pull request. Bonus points for topic branches.

Copyright (c) 2010 Dolores Labs. See LICENSE for details.