Class: MapRedus::Master

Inherits:
QueueProcess show all
Defined in:
lib/mapredus/master.rb

Overview

Note: Instead of using Resque directly within the process, we implement a master interface with Resque

Does bookkeeping to keep track of how many slaves are doing work. If we have no slaves doing work for a process then the process is done. While there is work available the slaves will always be doing work.

Class Method Summary collapse

Methods inherited from QueueProcess

queue

Class Method Details

.emancipate(pid) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/mapredus/master.rb', line 118

def self.emancipate(pid)
  process = Process.open(pid)
  return unless process
  
  # Working on resque directly seems dangerous
  #
  # Warning: this is supposed to be used as a debugging operation
  # and isn't intended for normal use.  It is potentially very expensive.
  #
  destroyed = 0
  qs = [queue, process.mapper.queue, process.reducer.queue, process.finalizer.queue].uniq
  qs.each do |q|
    q_key = "queue:#{q}"
    Resque.redis.lrange(q_key, 0, -1).each do | string |
      json   = Helper.decode(string)
      match  = json['class'] == "MapRedus::Master"
      match |= json['class'] == process.inputter.to_s
      match |= json['class'] == process.mapper.to_s
      match |= json['class'] == process.reducer.to_s
      match |= json['class'] == process.finalizer.to_s
      match &= json['args'].first.to_s == process.pid.to_s
      if match
        destroyed += Resque.redis.lrem(q_key, 0, string).to_i
      end
    end
  end

  #
  # our slave information is kept track of on file and not in Resque
  #
  FileSystem.del(ProcessInfo.slaves(pid))
  destroyed
end

.enslave(process, klass, *args) ⇒ Object

The current default (QUEUE) that we push on to is

:mapredus


80
81
82
83
84
85
86
87
88
# File 'lib/mapredus/master.rb', line 80

def self.enslave( process, klass, *args )
  FileSystem.rpush(ProcessInfo.slaves(process.pid), 1)
  
  if( process.synchronous )
    klass.perform(*args)
  else
    Resque.push( klass.queue, { :class => klass.to_s, :args => args } )
  end
end

.enslave_finalizer(process) ⇒ Object



58
59
60
# File 'lib/mapredus/master.rb', line 58

def self.enslave_finalizer( process )
  enslave( process, process.finalizer, process.pid )
end

.enslave_inputter(process, data_object) ⇒ Object



48
49
50
# File 'lib/mapredus/master.rb', line 48

def self.enslave_inputter(process, data_object)
  enslave( process, process.inputter, process.pid, data_object )
end

.enslave_later(delay_in_seconds, process, klass, *args) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/mapredus/master.rb', line 90

def self.enslave_later( delay_in_seconds, process, klass, *args)
  FileSystem.rpush(ProcessInfo.slaves(process.pid), 1)

  if( process.synchronous )
    klass.perform(*args)
  else
    #
    # TODO: I cannot get enqueue_in to work with my tests
    #       there seems to be a silent failure somewhere
    #       in the tests such that it never calls the function
    #       and the queue gets emptied
    #
    # Resque.enqueue_in(delay_in_seconds, klass, *args)
    
    ##
    ## Temporary, immediately just push process back onto the resque queue
    Resque.push( klass.queue, { :class => klass.to_s, :args => args } )
  end
end

.enslave_later_reduce(process, key) ⇒ Object



73
74
75
# File 'lib/mapredus/master.rb', line 73

def self.enslave_later_reduce(process, key)
  enslave_later( process.reducer.wait, process, process.reducer, process.pid, key )
end

.enslave_map(process, data_chunk) ⇒ Object

Have these to match what the Mapper/Reducer perform function expects to see as arguments

though instead of process the perform function will receive the pid



65
66
67
# File 'lib/mapredus/master.rb', line 65

def self.enslave_map(process, data_chunk)
  enslave( process, process.mapper, process.pid, data_chunk )
end

.enslave_reduce(process, key) ⇒ Object



69
70
71
# File 'lib/mapredus/master.rb', line 69

def self.enslave_reduce(process, key)
  enslave( process, process.reducer, process.pid, key )
end

.enslave_reducers(process) ⇒ Object



52
53
54
55
56
# File 'lib/mapredus/master.rb', line 52

def self.enslave_reducers( process )
  process.map_keys.each do |key|
    enslave_reduce( process, key )
  end
end

.finish_metrics(pid) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/mapredus/master.rb', line 163

def self.finish_metrics(pid)
  started  = ProcessInfo.started_at( pid )
  finished = ProcessInfo.finished_at( pid )
  requested = ProcessInfo.requested_at( pid )
  
  completion_time = Time.now.to_i
  
  FileSystem.set finished, completion_time
  time_to_complete = completion_time - FileSystem.get(started).to_i
  
  recent_ttcs = ProcessInfo.recent_time_to_complete
  FileSystem.lpush( recent_ttcs , time_to_complete )
  FileSystem.ltrim( recent_ttcs , 0, 30 - 1)
  
  FileSystem.expire finished, 60 * 60
  FileSystem.expire started, 60 * 60
  FileSystem.expire requested, 60 * 60
end

.free_slave(pid) ⇒ Object



114
115
116
# File 'lib/mapredus/master.rb', line 114

def self.free_slave(pid)
  FileSystem.lpop(ProcessInfo.slaves(pid))
end

.mapreduce(process, data_object) ⇒ Object

The order of operations that occur in the mapreduce process

The inputter sets off the mapper processes



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/mapredus/master.rb', line 34

def self.mapreduce( process, data_object )
  start_metrics(process.pid)
  if process.synchronous
    process.update(:state => INPUT_MAP_IN_PROGRESS)
    enslave_inputter(process, data_object)
    process.update(:state => REDUCE_IN_PROGRESS)
    enslave_reducers(process)
    process.update(:state => FINALIZER_IN_PROGRESS)
    enslave_finalizer(process)
  else
    Resque.push(QueueProcess.queue, {:class => MapRedus::Master , :args => [process.pid, data_object]} )
  end
end

.perform(pid, data_object) ⇒ Object

Master performs the work that it needs to do:

it must free itself as a slave from Resque
enslave mappers


23
24
25
26
27
# File 'lib/mapredus/master.rb', line 23

def self.perform( pid, data_object )
  process = Process.open(pid)
  enslave_inputter(process, data_object)
  process.update(:state => INPUT_MAP_IN_PROGRESS)
end

.set_request_time(pid) ⇒ Object

Time metrics for measuring how long it takes map reduce to do a process



154
155
156
# File 'lib/mapredus/master.rb', line 154

def self.set_request_time(pid)
  FileSystem.set( ProcessInfo.requested_at(pid), Time.now.to_i )
end

.slaves(pid) ⇒ Object



110
111
112
# File 'lib/mapredus/master.rb', line 110

def self.slaves(pid)
  FileSystem.lrange(ProcessInfo.slaves(pid), 0, -1)
end

.start_metrics(pid) ⇒ Object



158
159
160
161
# File 'lib/mapredus/master.rb', line 158

def self.start_metrics(pid)
  started  = ProcessInfo.started_at( pid )
  FileSystem.set started, Time.now.to_i
end

.working?(pid) ⇒ Boolean

Check whether there are still workers working on process PID’s processes

In synchronous condition, master is always working since nothing is going to the queue.

Returns:

  • (Boolean)


14
15
16
# File 'lib/mapredus/master.rb', line 14

def self.working?(pid)
  0 < FileSystem.llen(ProcessInfo.slaves(pid))
end