Class: MapRedus::Master
- Inherits:
-
QueueProcess
- Object
- QueueProcess
- MapRedus::Master
- Defined in:
- lib/mapredus/master.rb
Overview
Note: Instead of using Resque directly within the process, we implement a master interface with Resque
Does bookkeeping to keep track of how many slaves are doing work. If we have no slaves doing work for a process then the process is donex. While there is work available the slaves will always be doing work.
Class Method Summary collapse
- .emancipate(pid) ⇒ Object
-
.enslave(process, klass, *args) ⇒ Object
The current default (QUEUE) that we push on to is :mapredus.
- .enslave_finalizer(process) ⇒ Object
- .enslave_inputter(process, *data_object) ⇒ Object
- .enslave_later(delay_in_seconds, process, klass, *args) ⇒ Object
- .enslave_later_reduce(process, key) ⇒ Object
-
.enslave_map(process, data_chunk) ⇒ Object
Have these to match what the Mapper/Reducer perform function expects to see as arguments.
- .enslave_reduce(process, key) ⇒ Object
-
.enslave_reducers(process) ⇒ Object
Enslave the reducers:.
- .finish_metrics(pid) ⇒ Object
- .free_slave(pid) ⇒ Object
-
.mapreduce(process, *data_object) ⇒ Object
The order of operations that occur in the mapreduce process.
-
.perform(pid, data_object) ⇒ Object
Master performs the work that it needs to do: it must free itself as a slave from Resque enslave mappers.
-
.set_request_time(pid) ⇒ Object
Time metrics for measuring how long it takes map reduce to do a process.
- .slaves(pid) ⇒ Object
- .start_metrics(pid) ⇒ Object
-
.working?(pid) ⇒ Boolean
Check whether there are still workers working on process PID’s processes.
Methods inherited from QueueProcess
Class Method Details
.emancipate(pid) ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/mapredus/master.rb', line 132 def self.emancipate(pid) process = Process.open(pid) return unless process # Working on resque directly seems dangerous # # Warning: this is supposed to be used as a debugging operation # and isn't intended for normal use. It is potentially very expensive. # destroyed = 0 qs = [queue, process.mapper.queue, process.reducer.queue, process.finalizer.queue].uniq qs.each do |q| q_key = "queue:#{q}" Resque.redis.lrange(q_key, 0, -1).each do | string | json = Helper.decode(string) match = json['class'] == "MapRedus::Master" match |= json['class'] == process.inputter.to_s match |= json['class'] == process.mapper.to_s match |= json['class'] == process.reducer.to_s match |= json['class'] == process.finalizer.to_s match &= json['args'].first.to_s == process.pid.to_s if match destroyed += Resque.redis.lrem(q_key, 0, string).to_i end end end # # our slave information is kept track of on file and not in Resque # FileSystem.del(ProcessInfo.slaves(pid)) destroyed end |
.enslave(process, klass, *args) ⇒ Object
The current default (QUEUE) that we push on to is
:mapredus
94 95 96 97 98 99 100 101 102 |
# File 'lib/mapredus/master.rb', line 94 def self.enslave( process, klass, *args ) FileSystem.rpush(ProcessInfo.slaves(process.pid), 1) if( process.synchronous ) klass.perform(*args) else Resque.push( klass.queue, { :class => klass.to_s, :args => args } ) end end |
.enslave_finalizer(process) ⇒ Object
72 73 74 |
# File 'lib/mapredus/master.rb', line 72 def self.enslave_finalizer( process ) enslave( process, process.finalizer, process.pid ) end |
.enslave_inputter(process, *data_object) ⇒ Object
48 49 50 |
# File 'lib/mapredus/master.rb', line 48 def self.enslave_inputter(process, *data_object) enslave( process, process.inputter, process.pid, *data_object ) end |
.enslave_later(delay_in_seconds, process, klass, *args) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/mapredus/master.rb', line 104 def self.enslave_later( delay_in_seconds, process, klass, *args) FileSystem.rpush(ProcessInfo.slaves(process.pid), 1) if( process.synchronous ) klass.perform(*args) else # # TODO: I cannot get enqueue_in to work with my tests # there seems to be a silent failure somewhere # in the tests such that it never calls the function # and the queue gets emptied # # Resque.enqueue_in(delay_in_seconds, klass, *args) ## ## Temporary, immediately just push process back onto the resque queue Resque.push( klass.queue, { :class => klass.to_s, :args => args } ) end end |
.enslave_later_reduce(process, key) ⇒ Object
87 88 89 |
# File 'lib/mapredus/master.rb', line 87 def self.enslave_later_reduce(process, key) enslave_later( process.reducer.wait, process, process.reducer, process.pid, key ) end |
.enslave_map(process, data_chunk) ⇒ Object
Have these to match what the Mapper/Reducer perform function expects to see as arguments
though instead of process the perform function will receive the pid
79 80 81 |
# File 'lib/mapredus/master.rb', line 79 def self.enslave_map(process, data_chunk) enslave( process, process.mapper, process.pid, data_chunk ) end |
.enslave_reduce(process, key) ⇒ Object
83 84 85 |
# File 'lib/mapredus/master.rb', line 83 def self.enslave_reduce(process, key) enslave( process, process.reducer, process.pid, key ) end |
.enslave_reducers(process) ⇒ Object
Enslave the reducers:
For each key, enslave a reducer to process the values on that key. If there were no keys produced during the map operation we must set off the finalizer.
TODO: inject optimizations here for special reducers like the identity reduce
returns nothing
62 63 64 65 66 67 68 69 70 |
# File 'lib/mapredus/master.rb', line 62 def self.enslave_reducers( process ) if( process.num_keys > 0 ) process.map_keys.each do |key| enslave_reduce( process, key ) end else process.next_state end end |
.finish_metrics(pid) ⇒ Object
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/mapredus/master.rb', line 177 def self.finish_metrics(pid) started = ProcessInfo.started_at( pid ) finished = ProcessInfo.finished_at( pid ) requested = ProcessInfo.requested_at( pid ) completion_time = Time.now.to_i FileSystem.set finished, completion_time time_to_complete = completion_time - FileSystem.get(started).to_i recent_ttcs = ProcessInfo.recent_time_to_complete FileSystem.lpush( recent_ttcs , time_to_complete ) FileSystem.ltrim( recent_ttcs , 0, 30 - 1) FileSystem.expire finished, 60 * 60 FileSystem.expire started, 60 * 60 FileSystem.expire requested, 60 * 60 end |
.free_slave(pid) ⇒ Object
128 129 130 |
# File 'lib/mapredus/master.rb', line 128 def self.free_slave(pid) FileSystem.lpop(ProcessInfo.slaves(pid)) end |
.mapreduce(process, *data_object) ⇒ Object
The order of operations that occur in the mapreduce process
The inputter sets off the mapper processes
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/mapredus/master.rb', line 34 def self.mapreduce( process, *data_object ) start_metrics(process.pid) if process.synchronous process.update(:state => INPUT_MAP_IN_PROGRESS) enslave_inputter(process, *data_object) process.update(:state => REDUCE_IN_PROGRESS) enslave_reducers(process) process.update(:state => FINALIZER_IN_PROGRESS) enslave_finalizer(process) else Resque.push(QueueProcess.queue, {:class => MapRedus::Master , :args => [process.pid, data_object]} ) end end |
.perform(pid, data_object) ⇒ Object
Master performs the work that it needs to do:
it must free itself as a slave from Resque
enslave mappers
23 24 25 26 27 |
# File 'lib/mapredus/master.rb', line 23 def self.perform( pid, data_object ) process = Process.open(pid) enslave_inputter(process, *data_object) process.update(:state => INPUT_MAP_IN_PROGRESS) end |
.set_request_time(pid) ⇒ Object
Time metrics for measuring how long it takes map reduce to do a process
168 169 170 |
# File 'lib/mapredus/master.rb', line 168 def self.set_request_time(pid) FileSystem.set( ProcessInfo.requested_at(pid), Time.now.to_i ) end |
.slaves(pid) ⇒ Object
124 125 126 |
# File 'lib/mapredus/master.rb', line 124 def self.slaves(pid) FileSystem.lrange(ProcessInfo.slaves(pid), 0, -1) end |
.start_metrics(pid) ⇒ Object
172 173 174 175 |
# File 'lib/mapredus/master.rb', line 172 def self.start_metrics(pid) started = ProcessInfo.started_at( pid ) FileSystem.set started, Time.now.to_i end |
.working?(pid) ⇒ Boolean
Check whether there are still workers working on process PID’s processes
In synchronous condition, master is always working since nothing is going to the queue.
14 15 16 |
# File 'lib/mapredus/master.rb', line 14 def self.working?(pid) 0 < FileSystem.llen(ProcessInfo.slaves(pid)) end |