Class: MapRedus::Master

Inherits:
QueueProcess show all
Defined in:
lib/mapredus/master.rb

Overview

Note: Instead of using Resque directly within the process, we implement a master interface with Resque

Does bookkeeping to keep track of how many slaves are doing work. If we have no slaves doing work for a process then the process is donex. While there is work available the slaves will always be doing work.

Class Method Summary collapse

Methods inherited from QueueProcess

queue

Class Method Details

.emancipate(pid) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/mapredus/master.rb', line 132

def self.emancipate(pid)
  process = Process.open(pid)
  return unless process
  
  # Working on resque directly seems dangerous
  #
  # Warning: this is supposed to be used as a debugging operation
  # and isn't intended for normal use.  It is potentially very expensive.
  #
  destroyed = 0
  qs = [queue, process.mapper.queue, process.reducer.queue, process.finalizer.queue].uniq
  qs.each do |q|
    q_key = "queue:#{q}"
    Resque.redis.lrange(q_key, 0, -1).each do | string |
      json   = Helper.decode(string)
      match  = json['class'] == "MapRedus::Master"
      match |= json['class'] == process.inputter.to_s
      match |= json['class'] == process.mapper.to_s
      match |= json['class'] == process.reducer.to_s
      match |= json['class'] == process.finalizer.to_s
      match &= json['args'].first.to_s == process.pid.to_s
      if match
        destroyed += Resque.redis.lrem(q_key, 0, string).to_i
      end
    end
  end

  #
  # our slave information is kept track of on file and not in Resque
  #
  FileSystem.del(ProcessInfo.slaves(pid))
  destroyed
end

.enslave(process, klass, *args) ⇒ Object

The current default (QUEUE) that we push on to is

:mapredus


94
95
96
97
98
99
100
101
102
# File 'lib/mapredus/master.rb', line 94

def self.enslave( process, klass, *args )
  FileSystem.rpush(ProcessInfo.slaves(process.pid), 1)

  if( process.synchronous )
    klass.perform(*args)
  else
    Resque.push( klass.queue, { :class => klass.to_s, :args => args } )
  end
end

.enslave_finalizer(process) ⇒ Object



72
73
74
# File 'lib/mapredus/master.rb', line 72

def self.enslave_finalizer( process )
  enslave( process, process.finalizer, process.pid )
end

.enslave_inputter(process, *data_object) ⇒ Object



48
49
50
# File 'lib/mapredus/master.rb', line 48

def self.enslave_inputter(process, *data_object)
  enslave( process, process.inputter, process.pid, *data_object )
end

.enslave_later(delay_in_seconds, process, klass, *args) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/mapredus/master.rb', line 104

def self.enslave_later( delay_in_seconds, process, klass, *args)
  FileSystem.rpush(ProcessInfo.slaves(process.pid), 1)

  if( process.synchronous )
    klass.perform(*args)
  else
    #
    # TODO: I cannot get enqueue_in to work with my tests
    #       there seems to be a silent failure somewhere
    #       in the tests such that it never calls the function
    #       and the queue gets emptied
    #
    # Resque.enqueue_in(delay_in_seconds, klass, *args)
    
    ##
    ## Temporary, immediately just push process back onto the resque queue
    Resque.push( klass.queue, { :class => klass.to_s, :args => args } )
  end
end

.enslave_later_reduce(process, key) ⇒ Object



87
88
89
# File 'lib/mapredus/master.rb', line 87

def self.enslave_later_reduce(process, key)
  enslave_later( process.reducer.wait, process, process.reducer, process.pid, key )
end

.enslave_map(process, data_chunk) ⇒ Object

Have these to match what the Mapper/Reducer perform function expects to see as arguments

though instead of process the perform function will receive the pid



79
80
81
# File 'lib/mapredus/master.rb', line 79

def self.enslave_map(process, data_chunk)
  enslave( process, process.mapper, process.pid, data_chunk )
end

.enslave_reduce(process, key) ⇒ Object



83
84
85
# File 'lib/mapredus/master.rb', line 83

def self.enslave_reduce(process, key)
  enslave( process, process.reducer, process.pid, key )
end

.enslave_reducers(process) ⇒ Object

Enslave the reducers:

For each key, enslave a reducer to process the values on that key. If there were no keys produced during the map operation we must set off the finalizer.

TODO: inject optimizations here for special reducers like the identity reduce

returns nothing



62
63
64
65
66
67
68
69
70
# File 'lib/mapredus/master.rb', line 62

def self.enslave_reducers( process )
  if( process.num_keys > 0 )
    process.map_keys.each do |key|
      enslave_reduce( process, key )
    end
  else
    process.next_state
  end
end

.finish_metrics(pid) ⇒ Object



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'lib/mapredus/master.rb', line 177

def self.finish_metrics(pid)
  started  = ProcessInfo.started_at( pid )
  finished = ProcessInfo.finished_at( pid )
  requested = ProcessInfo.requested_at( pid )
  
  completion_time = Time.now.to_i
  
  FileSystem.set finished, completion_time
  time_to_complete = completion_time - FileSystem.get(started).to_i
  
  recent_ttcs = ProcessInfo.recent_time_to_complete
  FileSystem.lpush( recent_ttcs , time_to_complete )
  FileSystem.ltrim( recent_ttcs , 0, 30 - 1)
  
  FileSystem.expire finished, 60 * 60
  FileSystem.expire started, 60 * 60
  FileSystem.expire requested, 60 * 60
end

.free_slave(pid) ⇒ Object



128
129
130
# File 'lib/mapredus/master.rb', line 128

def self.free_slave(pid)
  FileSystem.lpop(ProcessInfo.slaves(pid))
end

.mapreduce(process, *data_object) ⇒ Object

The order of operations that occur in the mapreduce process

The inputter sets off the mapper processes



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/mapredus/master.rb', line 34

def self.mapreduce( process, *data_object )
  start_metrics(process.pid)
  if process.synchronous
    process.update(:state => INPUT_MAP_IN_PROGRESS)
    enslave_inputter(process, *data_object)
    process.update(:state => REDUCE_IN_PROGRESS)
    enslave_reducers(process)
    process.update(:state => FINALIZER_IN_PROGRESS)
    enslave_finalizer(process)
  else
    Resque.push(QueueProcess.queue, {:class => MapRedus::Master , :args => [process.pid, data_object]} )
  end
end

.perform(pid, data_object) ⇒ Object

Master performs the work that it needs to do:

it must free itself as a slave from Resque
enslave mappers


23
24
25
26
27
# File 'lib/mapredus/master.rb', line 23

def self.perform( pid, data_object )
  process = Process.open(pid)
  enslave_inputter(process, *data_object)
  process.update(:state => INPUT_MAP_IN_PROGRESS)
end

.set_request_time(pid) ⇒ Object

Time metrics for measuring how long it takes map reduce to do a process



168
169
170
# File 'lib/mapredus/master.rb', line 168

def self.set_request_time(pid)
  FileSystem.set( ProcessInfo.requested_at(pid), Time.now.to_i )
end

.slaves(pid) ⇒ Object



124
125
126
# File 'lib/mapredus/master.rb', line 124

def self.slaves(pid)
  FileSystem.lrange(ProcessInfo.slaves(pid), 0, -1)
end

.start_metrics(pid) ⇒ Object



172
173
174
175
# File 'lib/mapredus/master.rb', line 172

def self.start_metrics(pid)
  started  = ProcessInfo.started_at( pid )
  FileSystem.set started, Time.now.to_i
end

.working?(pid) ⇒ Boolean

Check whether there are still workers working on process PID’s processes

In synchronous condition, master is always working since nothing is going to the queue.

Returns:

  • (Boolean)


14
15
16
# File 'lib/mapredus/master.rb', line 14

def self.working?(pid)
  0 < FileSystem.llen(ProcessInfo.slaves(pid))
end