Class: Tem::Mr::Search::MapReduceExecutor
- Inherits:
-
Object
- Object
- Tem::Mr::Search::MapReduceExecutor
- Defined in:
- lib/tem_mr_search/map_reduce_executor.rb
Overview
Coordination code (executor) for performing a Map-Reduce computation.
The executor distributes the Map-Reduce computation across multiple TEMs. The strategy used to allocate tasks to TEMs is expressed by a MapReducePlanner class, and the executor instantiates that class. The executor is responsible for coordinating between the TEMs and the planner.
Instance Method Summary collapse
-
#collect_tem_ids ⇒ Object
Collects identification information from all the TEMs.
-
#execute ⇒ Object
Executes the job.
-
#initialize(job, db, tems, root_tems, planner_class = nil) ⇒ MapReduceExecutor
constructor
Creates an executor for a Map-Reduce job.
Constructor Details
#initialize(job, db, tems, root_tems, planner_class = nil) ⇒ MapReduceExecutor
Creates an executor for a Map-Reduce job.
Arguments:
job:: the Map-Reduce job (see Tem::Mr::Search::MapReduceJob)
db:: the database to run Map-Reduce over
tems:: sessions to the available TEMs
root_tems:: the indexes of the TEMs that have the initial SECpacks bound
to them (hash with the keys +:mapper+, +:reducer+ and
+:finalizer+)
planner_class:: (optional) replacement for the default planner strategy
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/tem_mr_search/map_reduce_executor.rb', line 30 def initialize(job, db, tems, root_tems, planner_class = nil) planner_class ||= MapReducePlanner @db = db # Writable only in main thread. @tems = tems # Writable only in main thread. # Protected by @lock during collect_tem_ids, read-only during execute. @tem_certs = Array.new @tems.length # Writable only in main thread. @planner = planner_class.new job, db.length, tems.length, root_tems # Protected by @lock @tem_parts = { :mapper => { root_tems[:mapper] => job.mapper }, :reducer => {root_tems[:reducer] => job.reducer }, :finalizer => { root_tems[:finalizer] => job.finalizer } } # Protected by @lock @outputs = {} # Protected by @lock tasks = { :map => 0.0, :reduce => 0.0, :finalize => 0.0, :migrate_map => 0.0, :migrate_reduce => 0.0, :tem_ids => 0.0 } @timings = { :tems => Array.new(@tems.length) { tasks.dup } } # Thread-safe. @thread_queues = tems.map { |tem| Queue.new } @main_queue = Queue.new @lock = Mutex.new end |
Instance Method Details
#collect_tem_ids ⇒ Object
Collects identification information from all the TEMs.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/tem_mr_search/map_reduce_executor.rb', line 87 def collect_tem_ids threads = (0...@tems.length).map do |tem_index| Thread.new(tem_index, @tems[tem_index]) do |index, tem| t0 = Time.now ecert = tem.endorsement_cert time_delta = Time.now - t0 @lock.synchronize do @tem_certs[index] = ecert @timings[:tems][index][:tem_ids] += time_delta end end end threads.each { |thread| thread.join } end |
#execute ⇒ Object
Executes the job.
Returns a hash with the following keys:
:result:: the job's result
:timings:: timing statistics on the job's execution
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/tem_mr_search/map_reduce_executor.rb', line 65 def execute t0 = Time.now collect_tem_ids # Spawn TEM threads. @tems.each_index { |i| Thread.new(i) { |i| executor_thread i } } until @planner.done? actions = @planner.next_actions! @lock.synchronize do actions.each { |action| @thread_queues[action[:with]] << action } end action = @main_queue.pop @planner.action_done action end @timings[:total] = Time.now - t0 return { :result => @outputs[@planner.output_id], :timings => @timings } end |