Class: MapRedus::Process

Inherits:
Object
  • Object
show all
Defined in:
lib/mapredus/process.rb

Overview

This is what keeps track of our map reduce processes

We use a redis key to identify the id of map reduce process the value of the redis object is a json object which contains:

{
  inputter : inputstreamclass,
  mapper : mapclass,
  reducer : reduceclass,
  finalizer : finalizerclass,
  outputter : outputterclass,
  partitioner : <not supported>,
  combiner : <not supported>,
  ordered : true_or_false   ## ensures ordering keys from the map output --> [ order, key, value ],
  synchronous : true_or_false   ## runs the process synchronously or not (generally used for testing)
  result_timeout : lenght of time a result is saved ## 3600 * 24
  key_args : arguments to be added to the key location of the result save (cache location)
  state : the current state of the process (shouldn't be set by the process and starts off as nil)
  type : the original process class ( currently this is needed so we can have namespaces for the result_cache keys )
}

The user has the ability in subclassing this class to create extra features if needed

Constant Summary collapse

READERS =

Public: Keep track of information that may show up as the redis json value

This is so we know exactly what might show up in the json hash
[:pid]
ATTRS =
[:inputter, :mapper, :reducer, :finalizer, :outputter, :ordered, :synchronous, :result_timeout, :key_args, :state, :type]
DEFAULT_TIME =
3600 * 24

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pid, json_info) ⇒ Process

Returns a new instance of Process.



35
36
37
38
# File 'lib/mapredus/process.rb', line 35

def initialize(pid, json_info)
  @pid = pid
  read(json_info)
end

Class Method Details

.createObject

Create sets up a process to be run with the given specification. It saves the information in the FileSystem and returns an instance of the process that run should be called on when running is desired.

Example

process = MapRedus::Process.create
process.run

Returns an instance of the process



324
325
326
327
328
329
330
331
332
# File 'lib/mapredus/process.rb', line 324

def self.create
  new_pid = get_available_pid
  specification = ATTRS.inject({}) do |ret, attr|
    ret[attr] = send(attr)
    ret
  end
  specification[:type] = self
  self.new(new_pid, specification).save
end

.delete_saved_result(*key_args) ⇒ Object

Given a arguments for a result key, delete the result from the filesystem.

Examples

Process.delete_saved_result(key)


428
429
430
# File 'lib/mapredus/process.rb', line 428

def self.delete_saved_result(*key_args)
  FileSystem.del( result_key(*key_args) )
end

.get_available_pidObject

Find out what map reduce processes are out there

Examples

FileSystem::get_available_pid

Returns an avilable pid.



419
420
421
# File 'lib/mapredus/process.rb', line 419

def self.get_available_pid
  FileSystem.incrby(ProcessInfo.processes_count, 1 + rand(20)) 
end

.info(pid) ⇒ Object

This function returns all the redis keys produced associated with a process’s process id.

Example

Process.info(17)

Returns an array of keys associated with the process id.



389
390
391
# File 'lib/mapredus/process.rb', line 389

def self.info(pid)
  FileSystem.keys(ProcessInfo.pid(pid) + "*")
end

.kill(pid) ⇒ Object

Remove redis keys associated with this process if the Master isn’t working.

potentially is very expensive.

Example

Process::kill(pid)
# => true

Returns true on success.



441
442
443
444
445
446
# File 'lib/mapredus/process.rb', line 441

def self.kill(pid)
  num_killed = Master.emancipate(pid)
  proc = Process.open(pid)
  proc.delete if proc
  num_killed
end

.kill_allObject



448
449
450
451
452
453
454
# File 'lib/mapredus/process.rb', line 448

def self.kill_all
  ps.each do |pid|
    kill(pid)
  end
  FileSystem.del(ProcessInfo.processes)
  FileSystem.del(ProcessInfo.processes_count)
end

.open(pid) ⇒ Object

Returns an instance of the process class given the process id. If no such process id exists returns nil.

Example

process = Process.open(17)


398
399
400
401
# File 'lib/mapredus/process.rb', line 398

def self.open(pid)
  spec = Helper.decode( FileSystem.get(ProcessInfo.pid(pid)) )
  spec && self.new( pid, spec )
end

.psObject

Find out what map reduce processes are out there

Examples

FileSystem::ps

Returns a list of the map reduce process ids



409
410
411
# File 'lib/mapredus/process.rb', line 409

def self.ps
  FileSystem.smembers(ProcessInfo.processes)
end

.result_key(*args) ⇒ Object



304
305
306
307
308
# File 'lib/mapredus/process.rb', line 304

def self.result_key(*args)
  key_maker = "#{self.to_s.gsub(/\W/,"_")}_result_cache"
  key_maker = ProcessInfo.respond_to?(key_maker) ? key_maker : "#{MapRedus::Process.to_s.gsub(/\W/,"_")}_result_cache"
  ProcessInfo.send( key_maker, *args )
end

.set_result_key(key_struct) ⇒ Object



310
311
312
# File 'lib/mapredus/process.rb', line 310

def self.set_result_key(key_struct)
  MapRedus.redefine_redis_key( "#{self.to_s.gsub(/\W/,"_")}_result_cache", key_struct )
end

Instance Method Details

#delete(safe = true) ⇒ Object

This will not delete if the master is working It can’t get ahold of the files to shred while the master is working

if safe is set to false, this will delete all the redis stores associated with this process, but will not kill the process from the queue, if it is on the queue. The process operations will fail to work when its data is deleted

Examples

delete(safe)
# => true or false

Returns true as long as the master is not working.



102
103
104
105
106
107
108
109
110
# File 'lib/mapredus/process.rb', line 102

def delete(safe = true)
  return false if (safe && Master.working?(@pid))
  FileSystem.keys("mapredus:process:#{@pid}*").each do |k|
    FileSystem.del(k)
  end        
  FileSystem.srem(ProcessInfo.processes, @pid)
  FileSystem.set(ProcessInfo.processes_count, 0) if( 0 == FileSystem.scard(ProcessInfo.processes) )
  true
end

#each_key_nonreduced_valueObject

Iterates through the key, values

Example

each_key_nonreduced_value(pid)

Returns nothing.



132
133
134
135
136
137
138
# File 'lib/mapredus/process.rb', line 132

def each_key_nonreduced_value
  map_keys.each do |key|
    map_values(key).each do |value|
      yield key, value
    end
  end
end

#each_key_reduced_valueObject

Iterates through the key, values

Example

each_key_reduced_value(pid)

Returns nothing.



118
119
120
121
122
123
124
# File 'lib/mapredus/process.rb', line 118

def each_key_reduced_value
  map_keys.each do |key|
    reduce_values(key).each do |value|
      yield key, value
    end
  end
end

#emit(key, reduce_val) ⇒ Object

The emission associated with a reduce. Currently all reduced values are pushed onto a redis list. It may be the case that we want to directly use a different redis type given the kind of reduce we are doing. Often a reduce only returns one value, so instead of a rpush, we should do a set.

Examples

emit(key, reduced_value)

Returns “OK” on success.



214
215
216
217
# File 'lib/mapredus/process.rb', line 214

def emit(key, reduce_val)
  hashed_key = Helper.key_hash(key)
  FileSystem.rpush( ProcessInfo.reduce(@pid, hashed_key), reduce_val )
end

#emit_intermediate(*key_value) ⇒ Object

Emissions, when we get map/reduce results back we emit these to be stored in our file system (redis)

key_value - The key, value

Examples

emit_intermediate(key, value)
# => if an ordering is required
emit_intermediate(rank, key, value)

Returns the true on success.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/mapredus/process.rb', line 185

def emit_intermediate(*key_value)
  if( not @ordered )
    key, value = key_value
    FileSystem.sadd( ProcessInfo.keys(@pid), key )
    hashed_key = Helper.key_hash(key)
    FileSystem.rpush( ProcessInfo.map(@pid, hashed_key), value )
  else
    # if there's an order for the process then we should use a zset above
    # ordered process's map emits [rank, key, value]
    #
    rank, key, value = key_value
    FileSystem.zadd( ProcessInfo.keys(@pid), rank, key )
    hashed_key = Helper.key_hash(key)
    FileSystem.rpush( ProcessInfo.map(@pid, hashed_key), value )
  end
  raise "Key Collision: key:#{key}, #{key.class} => hashed key:#{hashed_key}" if key_collision?(hashed_key, key)
  true
end

#json_helper(json_info, key) ⇒ Object



55
56
57
# File 'lib/mapredus/process.rb', line 55

def json_helper(json_info, key)
  json_info[key.to_s] || json_info[key.to_sym]
end

#key_collision?(hashed_key, key) ⇒ Boolean

Returns:

  • (Boolean)


219
220
221
222
# File 'lib/mapredus/process.rb', line 219

def key_collision?(hashed_key, key)
  not ( FileSystem.setnx( ProcessInfo.hash_to_key(@pid, hashed_key), key ) ||
        FileSystem.get( ProcessInfo.hash_to_key(@pid, hashed_key) ) == key.to_s )
end

#map_keysObject

Keys that the map operation produced

Examples

map_keys
# =>

Returns the Keys.



246
247
248
249
250
251
252
# File 'lib/mapredus/process.rb', line 246

def map_keys
  if( not @ordered )
    FileSystem.smembers( ProcessInfo.keys(@pid) )
  else
    FileSystem.zrange( ProcessInfo.keys(@pid), 0, -1 )
  end
end

#map_values(key) ⇒ Object

values that the map operation produced, for a key

Examples

map_values(key)
# =>

Returns the values.



269
270
271
272
# File 'lib/mapredus/process.rb', line 269

def map_values(key)
  hashed_key = Helper.key_hash(key)
  FileSystem.lrange( ProcessInfo.map(@pid, hashed_key), 0, -1 )
end

#next_stateObject

Change the process state if the process is not running and is not synchronous

Examples

process.next_state(pid)

returns the state that the process switched to (or stays the same)



161
162
163
164
165
166
167
168
169
# File 'lib/mapredus/process.rb', line 161

def next_state
  if((not running?) and (not @synchronous))
    new_state = STATE_MACHINE[self.state]
    update(:state => new_state)
    method = "enslave_#{new_state}".to_sym
    Master.send(method, self) if( Master.respond_to?(method) )
    new_state
  end
end

#num_keysObject



254
255
256
257
258
259
260
# File 'lib/mapredus/process.rb', line 254

def num_keys()
  if( not @ordered )
    FileSystem.scard( ProcessInfo.keys(@pid) )
  else
    FileSystem.zcard( ProcessInfo.keys(@pid) )
  end
end

#num_values(key) ⇒ Object



274
275
276
277
# File 'lib/mapredus/process.rb', line 274

def num_values(key)
  hashed_key = Helper.key_hash(key)
  FileSystem.llen( ProcessInfo.map(@pid, hashed_key) )
end

#read(json_info) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/mapredus/process.rb', line 40

def read(json_info)
  @inputter = Helper.class_get(json_helper(json_info, :inputter))
  @mapper = Helper.class_get(json_helper(json_info, :mapper))
  @reducer = Helper.class_get(json_helper(json_info, :reducer))
  @finalizer = Helper.class_get(json_helper(json_info, :finalizer))
  @ordered = json_helper(json_info, :ordered)
  @synchronous = json_helper(json_info, :synchronous)
  @result_timeout = json_helper(json_info, :result_timeout) || DEFAULT_TIME
  @key_args = json_helper(json_info, :key_args) || []
  @state = json_helper(json_info, :state) || NOT_STARTED
  @outputter = json_helper(json_info, :outputter)
  @outputter = @outputter ? Helper.class_get(@outputter) : MapRedus::Outputter
  @type = Helper.class_get(json_helper(json_info, :type) || Process)
end

#reduce_values(key) ⇒ Object

values that the reduce operation produced, for a key

Examples

reduce_values(key)
# =>

Returns the values.



286
287
288
289
# File 'lib/mapredus/process.rb', line 286

def reduce_values(key)
  hashed_key = Helper.key_hash(key)
  FileSystem.lrange( ProcessInfo.reduce(@pid, hashed_key), 0, -1 )
end

#reloadObject



85
86
87
88
# File 'lib/mapredus/process.rb', line 85

def reload
  read(Helper.decode(FileSystem.get(ProcessInfo.pid(@pid))))
  self
end

#result_key(*args) ⇒ Object

functions to manage the location of the result in the FileSystem

Examples

process.result_key(extra, arguments)
Process.result_key(all, needed, arguments)
# => "something:that:uses:the:extra:arguments"

SomeProcessSubclass.set_result_key("something:ARG:something:VAR")
# sets the result key for (CAPITAL require arguments to fill in the values)


300
301
302
# File 'lib/mapredus/process.rb', line 300

def result_key(*args)
  Helper.class_get(@type).result_key(*[@key_args, args].flatten)
end

#run(data_object, synchronous = false) ⇒ Object



140
141
142
143
144
# File 'lib/mapredus/process.rb', line 140

def run( data_object, synchronous = false )
  update(:synchronous => synchronous)
  Master.mapreduce( self, data_object )
  true
end

#running?Boolean

TODO: Should also have some notion of whether the process is completed or not since the master might not be working, but the process is not yet complete so it is still running

Returns:

  • (Boolean)


150
151
152
# File 'lib/mapredus/process.rb', line 150

def running?
  Master.working?(@pid)
end

#saveObject



72
73
74
75
76
# File 'lib/mapredus/process.rb', line 72

def save
  FileSystem.sadd( ProcessInfo.processes, @pid ) 
  FileSystem.save( ProcessInfo.pid(@pid), to_json )
  self
end

#to_hashObject



61
62
63
64
65
66
# File 'lib/mapredus/process.rb', line 61

def to_hash
  (ATTRS + READERS).inject({}) do |h, attr|
    h[attr] = send(attr)
    h 
  end
end

#to_jsonObject



68
69
70
# File 'lib/mapredus/process.rb', line 68

def to_json
  Helper.encode(to_hash)
end

#to_sObject



59
# File 'lib/mapredus/process.rb', line 59

def to_s; to_json; end

#update(attrs = {}) ⇒ Object



78
79
80
81
82
83
# File 'lib/mapredus/process.rb', line 78

def update(attrs = {})
  attrs.each do |attr, val|
    send("#{attr}=", val)
  end
  save
end