Class: MapRedus::Master
- Inherits:
-
QueueProcess
- Object
- QueueProcess
- MapRedus::Master
- Defined in:
- lib/mapredus/master.rb
Overview
Note: Instead of using Resque directly within the process, we implement a master interface with Resque
Does bookkeeping to keep track of how many slaves are doing work. If we have no slaves doing work for a process then the process is done. While there is work available the slaves will always be doing work.
Class Method Summary collapse
- .emancipate(pid) ⇒ Object
-
.enslave(process, klass, *args) ⇒ Object
The current default (QUEUE) that we push on to is :mapredus.
- .enslave_finalizer(process) ⇒ Object
- .enslave_inputter(process, data_object) ⇒ Object
- .enslave_later(delay_in_seconds, process, klass, *args) ⇒ Object
- .enslave_later_reduce(process, key) ⇒ Object
-
.enslave_map(process, data_chunk) ⇒ Object
Have these to match what the Mapper/Reducer perform function expects to see as arguments.
- .enslave_reduce(process, key) ⇒ Object
- .enslave_reducers(process) ⇒ Object
- .finish_metrics(pid) ⇒ Object
- .free_slave(pid) ⇒ Object
-
.mapreduce(process, data_object) ⇒ Object
The order of operations that occur in the mapreduce process.
-
.perform(pid, data_object) ⇒ Object
Master performs the work that it needs to do: it must free itself as a slave from Resque enslave mappers.
-
.set_request_time(pid) ⇒ Object
Time metrics for measuring how long it takes map reduce to do a process.
- .slaves(pid) ⇒ Object
- .start_metrics(pid) ⇒ Object
-
.working?(pid) ⇒ Boolean
Check whether there are still workers working on process PID’s processes.
Methods inherited from QueueProcess
Class Method Details
.emancipate(pid) ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/mapredus/master.rb', line 118 def self.emancipate(pid) process = Process.open(pid) return unless process # Working on resque directly seems dangerous # # Warning: this is supposed to be used as a debugging operation # and isn't intended for normal use. It is potentially very expensive. # destroyed = 0 qs = [queue, process.mapper.queue, process.reducer.queue, process.finalizer.queue].uniq qs.each do |q| q_key = "queue:#{q}" Resque.redis.lrange(q_key, 0, -1).each do | string | json = Helper.decode(string) match = json['class'] == "MapRedus::Master" match |= json['class'] == process.inputter.to_s match |= json['class'] == process.mapper.to_s match |= json['class'] == process.reducer.to_s match |= json['class'] == process.finalizer.to_s match &= json['args'].first.to_s == process.pid.to_s if match destroyed += Resque.redis.lrem(q_key, 0, string).to_i end end end # # our slave information is kept track of on file and not in Resque # FileSystem.del(ProcessInfo.slaves(pid)) destroyed end |
.enslave(process, klass, *args) ⇒ Object
The current default (QUEUE) that we push on to is
:mapredus
80 81 82 83 84 85 86 87 88 |
# File 'lib/mapredus/master.rb', line 80 def self.enslave( process, klass, *args ) FileSystem.rpush(ProcessInfo.slaves(process.pid), 1) if( process.synchronous ) klass.perform(*args) else Resque.push( klass.queue, { :class => klass.to_s, :args => args } ) end end |
.enslave_finalizer(process) ⇒ Object
58 59 60 |
# File 'lib/mapredus/master.rb', line 58 def self.enslave_finalizer( process ) enslave( process, process.finalizer, process.pid ) end |
.enslave_inputter(process, data_object) ⇒ Object
48 49 50 |
# File 'lib/mapredus/master.rb', line 48 def self.enslave_inputter(process, data_object) enslave( process, process.inputter, process.pid, data_object ) end |
.enslave_later(delay_in_seconds, process, klass, *args) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/mapredus/master.rb', line 90 def self.enslave_later( delay_in_seconds, process, klass, *args) FileSystem.rpush(ProcessInfo.slaves(process.pid), 1) if( process.synchronous ) klass.perform(*args) else # # TODO: I cannot get enqueue_in to work with my tests # there seems to be a silent failure somewhere # in the tests such that it never calls the function # and the queue gets emptied # # Resque.enqueue_in(delay_in_seconds, klass, *args) ## ## Temporary, immediately just push process back onto the resque queue Resque.push( klass.queue, { :class => klass.to_s, :args => args } ) end end |
.enslave_later_reduce(process, key) ⇒ Object
73 74 75 |
# File 'lib/mapredus/master.rb', line 73 def self.enslave_later_reduce(process, key) enslave_later( process.reducer.wait, process, process.reducer, process.pid, key ) end |
.enslave_map(process, data_chunk) ⇒ Object
Have these to match what the Mapper/Reducer perform function expects to see as arguments
though instead of process the perform function will receive the pid
65 66 67 |
# File 'lib/mapredus/master.rb', line 65 def self.enslave_map(process, data_chunk) enslave( process, process.mapper, process.pid, data_chunk ) end |
.enslave_reduce(process, key) ⇒ Object
69 70 71 |
# File 'lib/mapredus/master.rb', line 69 def self.enslave_reduce(process, key) enslave( process, process.reducer, process.pid, key ) end |
.enslave_reducers(process) ⇒ Object
52 53 54 55 56 |
# File 'lib/mapredus/master.rb', line 52 def self.enslave_reducers( process ) process.map_keys.each do |key| enslave_reduce( process, key ) end end |
.finish_metrics(pid) ⇒ Object
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/mapredus/master.rb', line 163 def self.finish_metrics(pid) started = ProcessInfo.started_at( pid ) finished = ProcessInfo.finished_at( pid ) requested = ProcessInfo.requested_at( pid ) completion_time = Time.now.to_i FileSystem.set finished, completion_time time_to_complete = completion_time - FileSystem.get(started).to_i recent_ttcs = ProcessInfo.recent_time_to_complete FileSystem.lpush( recent_ttcs , time_to_complete ) FileSystem.ltrim( recent_ttcs , 0, 30 - 1) FileSystem.expire finished, 60 * 60 FileSystem.expire started, 60 * 60 FileSystem.expire requested, 60 * 60 end |
.free_slave(pid) ⇒ Object
114 115 116 |
# File 'lib/mapredus/master.rb', line 114 def self.free_slave(pid) FileSystem.lpop(ProcessInfo.slaves(pid)) end |
.mapreduce(process, data_object) ⇒ Object
The order of operations that occur in the mapreduce process
The inputter sets off the mapper processes
34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/mapredus/master.rb', line 34 def self.mapreduce( process, data_object ) start_metrics(process.pid) if process.synchronous process.update(:state => INPUT_MAP_IN_PROGRESS) enslave_inputter(process, data_object) process.update(:state => REDUCE_IN_PROGRESS) enslave_reducers(process) process.update(:state => FINALIZER_IN_PROGRESS) enslave_finalizer(process) else Resque.push(QueueProcess.queue, {:class => MapRedus::Master , :args => [process.pid, data_object]} ) end end |
.perform(pid, data_object) ⇒ Object
Master performs the work that it needs to do:
it must free itself as a slave from Resque
enslave mappers
23 24 25 26 27 |
# File 'lib/mapredus/master.rb', line 23 def self.perform( pid, data_object ) process = Process.open(pid) enslave_inputter(process, data_object) process.update(:state => INPUT_MAP_IN_PROGRESS) end |
.set_request_time(pid) ⇒ Object
Time metrics for measuring how long it takes map reduce to do a process
154 155 156 |
# File 'lib/mapredus/master.rb', line 154 def self.set_request_time(pid) FileSystem.set( ProcessInfo.requested_at(pid), Time.now.to_i ) end |
.slaves(pid) ⇒ Object
110 111 112 |
# File 'lib/mapredus/master.rb', line 110 def self.slaves(pid) FileSystem.lrange(ProcessInfo.slaves(pid), 0, -1) end |
.start_metrics(pid) ⇒ Object
158 159 160 161 |
# File 'lib/mapredus/master.rb', line 158 def self.start_metrics(pid) started = ProcessInfo.started_at( pid ) FileSystem.set started, Time.now.to_i end |
.working?(pid) ⇒ Boolean
Check whether there are still workers working on process PID’s processes
In synchronous condition, master is always working since nothing is going to the queue.
14 15 16 |
# File 'lib/mapredus/master.rb', line 14 def self.working?(pid) 0 < FileSystem.llen(ProcessInfo.slaves(pid)) end |