Class: Tem::Mr::Search::MapReducePlanner
- Inherits:
-
Object
- Object
- Tem::Mr::Search::MapReducePlanner
- Defined in:
- lib/tem_mr_search/map_reduce_planner.rb
Overview
Allocates the individual components of a Map-Reduce job across TEMs.
This class is instantiated and used by MapReduceExecutor. It should not be used directly in client code, except for the purpose of replacing the default planner.
The Map-Reduce coordinator calls next_actions! on the planner, to obtain a list of actions that can be carried out. The planner guarantees that the actions are independent of each other, and that all their dependencies are satisfied. When the coordinator learns about the completion of some actions, it updates the planner’s state by calling action_done. After action_done is called, new_action should be called again to obtain new actions that can be carried out.
Partial results (outputs) in the Map-Reduce computation are identified by unique numbers starting from 0. The output IDs can be used as file names, if the outputs are stored in a distributed file system. When the computation is done (calling done? returns true
), the output_id
attribute will contain the ID of the computation’s final result.
Instance Attribute Summary collapse
-
#output_id ⇒ Object
readonly
The output ID of the Map-Reduce’s final result.
Instance Method Summary collapse
-
#action_done(action) ⇒ Object
Informs the planner that an action issued by next_actions! was completed.
-
#done? ⇒ Boolean
True when the Map-Reduce computation is complete.
-
#initialize(job, num_items, num_tems, root_tems) ⇒ MapReducePlanner
constructor
Creates a planner for a Map-Reduce job.
-
#next_actions! ⇒ Object
Issues a set of actions that can be performed right now.
Constructor Details
#initialize(job, num_items, num_tems, root_tems) ⇒ MapReducePlanner
Creates a planner for a Map-Reduce job.
Arguments:
job:: the Map-Reduce job (see Tem::Mr::Search::MapReduceJob)
num_items: how many data items does the Map-Reduce run over
num_tems:: how many TEMs are available
root_tems:: the indexes of the TEMs that have the initial SECpacks bound
to them (hash with the keys +:mapper+, +:reducer+ and
+:finalizer+)
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 42 def initialize(job, num_items, num_tems, root_tems) @job = job @root_tems = root_tems @without = { :mapper => RBTree.new, :reducer => RBTree.new } @with = { :mapper => Set.new([@root_tems[:mapper]]), :reducer => Set.new([@root_tems[:reducer]]) } @free_tems = RBTree.new # TEM ordering: the mapper root is first, the reducer root is last, and the # finalizer root is second @ordered_tems = (0...num_tems).to_a @ordered_tems -= @root_tems.values @ordered_tems = [@root_tems[:mapper]] + @ordered_tems unless @ordered_tems.include? @root_tems[:reducer] @ordered_tems += [@root_tems[:reducer]] end unless @ordered_tems.include? @root_tems[:finalizer] @ordered_tems = [@ordered_tems[0], @root_tems[:finalizer]] + @ordered_tems[1..-1] end # Reverted index for the TEM ordering. @rindex_tems = Array.new(num_tems) @ordered_tems.each_with_index { |t, i| @rindex_tems[t] = i } @ordered_tems.each_with_index do |tem, i| @free_tems[[i, tem]] = true @without.each { |k, v| v[[i, tem]] = true unless tem == @root_tems[k] } end @unmapped_items = (0...num_items).to_a.reverse @reduce_queue = RBTree.new @last_output_id = 0 @last_reduce_id = 2 * num_items - 2 @done_reducing, @output_id = false, nil end |
Instance Attribute Details
#output_id ⇒ Object (readonly)
The output ID of the Map-Reduce’s final result.
136 137 138 |
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 136 def output_id @output_id end |
Instance Method Details
#action_done(action) ⇒ Object
Informs the planner that an action issued by next_actions! was completed.
Args:
action:: an action hash, as returned by next_actions!
The return value is not specified.
124 125 126 127 128 |
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 124 def action_done(action) dispatch = { :migrate => :done_migrating, :map => :done_mapping, :reduce => :done_reducing, :finalize => :done_finalizing } self.send dispatch[action[:action]], action end |
#done? ⇒ Boolean
True when the Map-Reduce computation is complete.
131 132 133 |
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 131 def done? !@output_id.nil? end |
#next_actions! ⇒ Object
Issues a set of actions that can be performed right now.
The method alters the planner’s state assuming the actions will be performed.
Returns an array of hashes, with one hash per action to be performed. The :action
key specifies the type of action to be performed, and can be :migrate
:map
, :reduce
, or +:finalize. All the actions have the :with
key, which is the ID (0-based index) of the TEM that will be doing the action.
Migrate actions have the following keys:
:secpack:: the type of SECpack to be migrated ( +:mapper+ or +:reducer+ )
:with:: the ID of the TEM doing the migration
:to:: the number of the TEM that the SECpack should be migrated to
Map actions have the following keys:
:item_id:: the ID of the item to be mapped (number in Table-Scan order)
:with:: the ID of the TEM doing the mapping
:output_id:: ID for the result of the map operation
Reduce actions have the following keys:
:output1_id, :output2_id:: the IDs of the partial outputs to be reduced
:with:: the ID of the TEM doing the reducing
:output_id:: the ID for the result of the reduce operation
The finalize action has the following keys:
:output_id:: the ID of the last partial output, which will be finalized
:with:: the ID of the TEM doing the finalization
:final_id:: the ID for the computation's final result
109 110 111 112 113 114 115 116 |
# File 'lib/tem_mr_search/map_reduce_planner.rb', line 109 def next_actions! actions = migrate_actions :mapper actions += migrate_actions :reducer actions += reduce_actions actions += map_actions actions += finalize_actions actions end |