Class: MapRedus::Process
- Inherits:
-
Object
- Object
- MapRedus::Process
- Defined in:
- lib/mapredus/process.rb
Overview
This is what keeps track of our map reduce processes
We use a redis key to identify the id of map reduce process the value of the redis object is a json object which contains:
{
inputter : inputstreamclass,
mapper : mapclass,
reducer : reduceclass,
finalizer : finalizerclass,
outputter : outputterclass,
partitioner : <not supported>,
combiner : <not supported>,
ordered : true_or_false ## ensures ordering keys from the map output --> [ order, key, value ],
synchronous : true_or_false ## runs the process synchronously or not (generally used for testing)
result_timeout : lenght of time a result is saved ## 3600 * 24
key_args : arguments to be added to the key location of the result save (cache location)
state : the current state of the process (shouldn't be set by the process and starts off as nil)
type : the original process class ( currently this is needed so we can have namespaces for the result_cache keys )
}
The user has the ability in subclassing this class to create extra features if needed
Constant Summary collapse
- READERS =
Public: Keep track of information that may show up as the redis json value
This is so we know exactly what might show up in the json hash
[:pid]
- ATTRS =
[:inputter, :mapper, :reducer, :finalizer, :outputter, :ordered, :synchronous, :result_timeout, :key_args, :state, :type]
- DEFAULT_TIME =
3600 * 24
Class Method Summary collapse
-
.create ⇒ Object
Create sets up a process to be run with the given specification.
-
.delete_saved_result(*key_args) ⇒ Object
Given a arguments for a result key, delete the result from the filesystem.
-
.get_available_pid ⇒ Object
Find out what map reduce processes are out there.
-
.info(pid) ⇒ Object
This function returns all the redis keys produced associated with a process’s process id.
-
.kill(pid) ⇒ Object
Remove redis keys associated with this process if the Master isn’t working.
- .kill_all ⇒ Object
-
.open(pid) ⇒ Object
Returns an instance of the process class given the process id.
-
.ps ⇒ Object
Find out what map reduce processes are out there.
- .result_key(*args) ⇒ Object
- .set_result_key(key_struct) ⇒ Object
Instance Method Summary collapse
-
#delete(safe = true) ⇒ Object
This will not delete if the master is working It can’t get ahold of the files to shred while the master is working.
-
#each_key_nonreduced_value ⇒ Object
Iterates through the key, values.
-
#each_key_reduced_value ⇒ Object
Iterates through the key, values.
-
#emit(key, reduce_val) ⇒ Object
The emission associated with a reduce.
-
#emit_intermediate(*key_value) ⇒ Object
Emissions, when we get map/reduce results back we emit these to be stored in our file system (redis).
-
#initialize(pid, json_info) ⇒ Process
constructor
A new instance of Process.
- #json_helper(json_info, key) ⇒ Object
- #key_collision?(hashed_key, key) ⇒ Boolean
-
#map_keys ⇒ Object
Keys that the map operation produced.
-
#map_values(key) ⇒ Object
values that the map operation produced, for a key.
-
#next_state ⇒ Object
Change the process state if the process is not running and is not synchronous.
- #num_keys ⇒ Object
- #num_values(key) ⇒ Object
- #read(json_info) ⇒ Object
-
#reduce_values(key) ⇒ Object
values that the reduce operation produced, for a key.
- #reload ⇒ Object
-
#result_key(*args) ⇒ Object
functions to manage the location of the result in the FileSystem.
- #run(data_object, synchronous = false) ⇒ Object
-
#running? ⇒ Boolean
TODO: Should also have some notion of whether the process is completed or not since the master might not be working, but the process is not yet complete so it is still running.
- #save ⇒ Object
- #to_hash ⇒ Object
- #to_json ⇒ Object
- #to_s ⇒ Object
- #update(attrs = {}) ⇒ Object
Constructor Details
#initialize(pid, json_info) ⇒ Process
Returns a new instance of Process.
35 36 37 38 |
# File 'lib/mapredus/process.rb', line 35 def initialize(pid, json_info) @pid = pid read(json_info) end |
Class Method Details
.create ⇒ Object
324 325 326 327 328 329 330 331 332 |
# File 'lib/mapredus/process.rb', line 324 def self.create new_pid = get_available_pid specification = ATTRS.inject({}) do |ret, attr| ret[attr] = send(attr) ret end specification[:type] = self self.new(new_pid, specification).save end |
.delete_saved_result(*key_args) ⇒ Object
Given a arguments for a result key, delete the result from the filesystem.
Examples
Process.delete_saved_result(key)
428 429 430 |
# File 'lib/mapredus/process.rb', line 428 def self.delete_saved_result(*key_args) FileSystem.del( result_key(*key_args) ) end |
.get_available_pid ⇒ Object
Find out what map reduce processes are out there
Examples
FileSystem::get_available_pid
Returns an avilable pid.
419 420 421 |
# File 'lib/mapredus/process.rb', line 419 def self.get_available_pid FileSystem.incrby(ProcessInfo.processes_count, 1 + rand(20)) end |
.info(pid) ⇒ Object
This function returns all the redis keys produced associated with a process’s process id.
Example
Process.info(17)
Returns an array of keys associated with the process id.
389 390 391 |
# File 'lib/mapredus/process.rb', line 389 def self.info(pid) FileSystem.keys(ProcessInfo.pid(pid) + "*") end |
.kill(pid) ⇒ Object
Remove redis keys associated with this process if the Master isn’t working.
potentially is very expensive.
Example
Process::kill(pid)
# => true
Returns true on success.
441 442 443 444 445 446 |
# File 'lib/mapredus/process.rb', line 441 def self.kill(pid) num_killed = Master.emancipate(pid) proc = Process.open(pid) proc.delete if proc num_killed end |
.kill_all ⇒ Object
448 449 450 451 452 453 454 |
# File 'lib/mapredus/process.rb', line 448 def self.kill_all ps.each do |pid| kill(pid) end FileSystem.del(ProcessInfo.processes) FileSystem.del(ProcessInfo.processes_count) end |
.open(pid) ⇒ Object
Returns an instance of the process class given the process id. If no such process id exists returns nil.
Example
process = Process.open(17)
398 399 400 401 |
# File 'lib/mapredus/process.rb', line 398 def self.open(pid) spec = Helper.decode( FileSystem.get(ProcessInfo.pid(pid)) ) spec && self.new( pid, spec ) end |
.ps ⇒ Object
Find out what map reduce processes are out there
Examples
FileSystem::ps
Returns a list of the map reduce process ids
409 410 411 |
# File 'lib/mapredus/process.rb', line 409 def self.ps FileSystem.smembers(ProcessInfo.processes) end |
.result_key(*args) ⇒ Object
304 305 306 307 308 |
# File 'lib/mapredus/process.rb', line 304 def self.result_key(*args) key_maker = "#{self.to_s.gsub(/\W/,"_")}_result_cache" key_maker = ProcessInfo.respond_to?(key_maker) ? key_maker : "#{MapRedus::Process.to_s.gsub(/\W/,"_")}_result_cache" ProcessInfo.send( key_maker, *args ) end |
.set_result_key(key_struct) ⇒ Object
310 311 312 |
# File 'lib/mapredus/process.rb', line 310 def self.set_result_key(key_struct) MapRedus.redefine_redis_key( "#{self.to_s.gsub(/\W/,"_")}_result_cache", key_struct ) end |
Instance Method Details
#delete(safe = true) ⇒ Object
This will not delete if the master is working It can’t get ahold of the files to shred while the master is working
if safe is set to false, this will delete all the redis stores associated with this process, but will not kill the process from the queue, if it is on the queue. The process operations will fail to work when its data is deleted
Examples
delete(safe)
# => true or false
Returns true as long as the master is not working.
102 103 104 105 106 107 108 109 110 |
# File 'lib/mapredus/process.rb', line 102 def delete(safe = true) return false if (safe && Master.working?(@pid)) FileSystem.keys("mapredus:process:#{@pid}*").each do |k| FileSystem.del(k) end FileSystem.srem(ProcessInfo.processes, @pid) FileSystem.set(ProcessInfo.processes_count, 0) if( 0 == FileSystem.scard(ProcessInfo.processes) ) true end |
#each_key_nonreduced_value ⇒ Object
Iterates through the key, values
Example
each_key_nonreduced_value(pid)
Returns nothing.
132 133 134 135 136 137 138 |
# File 'lib/mapredus/process.rb', line 132 def each_key_nonreduced_value map_keys.each do |key| map_values(key).each do |value| yield key, value end end end |
#each_key_reduced_value ⇒ Object
Iterates through the key, values
Example
each_key_reduced_value(pid)
Returns nothing.
118 119 120 121 122 123 124 |
# File 'lib/mapredus/process.rb', line 118 def each_key_reduced_value map_keys.each do |key| reduce_values(key).each do |value| yield key, value end end end |
#emit(key, reduce_val) ⇒ Object
The emission associated with a reduce. Currently all reduced values are pushed onto a redis list. It may be the case that we want to directly use a different redis type given the kind of reduce we are doing. Often a reduce only returns one value, so instead of a rpush, we should do a set.
Examples
emit(key, reduced_value)
Returns “OK” on success.
214 215 216 217 |
# File 'lib/mapredus/process.rb', line 214 def emit(key, reduce_val) hashed_key = Helper.key_hash(key) FileSystem.rpush( ProcessInfo.reduce(@pid, hashed_key), reduce_val ) end |
#emit_intermediate(*key_value) ⇒ Object
Emissions, when we get map/reduce results back we emit these to be stored in our file system (redis)
key_value - The key, value
Examples
emit_intermediate(key, value)
# => if an ordering is required
emit_intermediate(rank, key, value)
Returns the true on success.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
# File 'lib/mapredus/process.rb', line 185 def emit_intermediate(*key_value) if( not @ordered ) key, value = key_value FileSystem.sadd( ProcessInfo.keys(@pid), key ) hashed_key = Helper.key_hash(key) FileSystem.rpush( ProcessInfo.map(@pid, hashed_key), value ) else # if there's an order for the process then we should use a zset above # ordered process's map emits [rank, key, value] # rank, key, value = key_value FileSystem.zadd( ProcessInfo.keys(@pid), rank, key ) hashed_key = Helper.key_hash(key) FileSystem.rpush( ProcessInfo.map(@pid, hashed_key), value ) end raise "Key Collision: key:#{key}, #{key.class} => hashed key:#{hashed_key}" if key_collision?(hashed_key, key) true end |
#json_helper(json_info, key) ⇒ Object
55 56 57 |
# File 'lib/mapredus/process.rb', line 55 def json_helper(json_info, key) json_info[key.to_s] || json_info[key.to_sym] end |
#key_collision?(hashed_key, key) ⇒ Boolean
219 220 221 222 |
# File 'lib/mapredus/process.rb', line 219 def key_collision?(hashed_key, key) not ( FileSystem.setnx( ProcessInfo.hash_to_key(@pid, hashed_key), key ) || FileSystem.get( ProcessInfo.hash_to_key(@pid, hashed_key) ) == key.to_s ) end |
#map_keys ⇒ Object
Keys that the map operation produced
Examples
map_keys
# =>
Returns the Keys.
246 247 248 249 250 251 252 |
# File 'lib/mapredus/process.rb', line 246 def map_keys if( not @ordered ) FileSystem.smembers( ProcessInfo.keys(@pid) ) else FileSystem.zrange( ProcessInfo.keys(@pid), 0, -1 ) end end |
#map_values(key) ⇒ Object
values that the map operation produced, for a key
Examples
map_values(key)
# =>
Returns the values.
269 270 271 272 |
# File 'lib/mapredus/process.rb', line 269 def map_values(key) hashed_key = Helper.key_hash(key) FileSystem.lrange( ProcessInfo.map(@pid, hashed_key), 0, -1 ) end |
#next_state ⇒ Object
Change the process state if the process is not running and is not synchronous
Examples
process.next_state(pid)
returns the state that the process switched to (or stays the same)
161 162 163 164 165 166 167 168 169 |
# File 'lib/mapredus/process.rb', line 161 def next_state if((not running?) and (not @synchronous)) new_state = STATE_MACHINE[self.state] update(:state => new_state) method = "enslave_#{new_state}".to_sym Master.send(method, self) if( Master.respond_to?(method) ) new_state end end |
#num_keys ⇒ Object
254 255 256 257 258 259 260 |
# File 'lib/mapredus/process.rb', line 254 def num_keys() if( not @ordered ) FileSystem.scard( ProcessInfo.keys(@pid) ) else FileSystem.zcard( ProcessInfo.keys(@pid) ) end end |
#num_values(key) ⇒ Object
274 275 276 277 |
# File 'lib/mapredus/process.rb', line 274 def num_values(key) hashed_key = Helper.key_hash(key) FileSystem.llen( ProcessInfo.map(@pid, hashed_key) ) end |
#read(json_info) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/mapredus/process.rb', line 40 def read(json_info) @inputter = Helper.class_get(json_helper(json_info, :inputter)) @mapper = Helper.class_get(json_helper(json_info, :mapper)) @reducer = Helper.class_get(json_helper(json_info, :reducer)) @finalizer = Helper.class_get(json_helper(json_info, :finalizer)) @ordered = json_helper(json_info, :ordered) @synchronous = json_helper(json_info, :synchronous) @result_timeout = json_helper(json_info, :result_timeout) || DEFAULT_TIME @key_args = json_helper(json_info, :key_args) || [] @state = json_helper(json_info, :state) || NOT_STARTED @outputter = json_helper(json_info, :outputter) @outputter = @outputter ? Helper.class_get(@outputter) : MapRedus::Outputter @type = Helper.class_get(json_helper(json_info, :type) || Process) end |
#reduce_values(key) ⇒ Object
values that the reduce operation produced, for a key
Examples
reduce_values(key)
# =>
Returns the values.
286 287 288 289 |
# File 'lib/mapredus/process.rb', line 286 def reduce_values(key) hashed_key = Helper.key_hash(key) FileSystem.lrange( ProcessInfo.reduce(@pid, hashed_key), 0, -1 ) end |
#reload ⇒ Object
85 86 87 88 |
# File 'lib/mapredus/process.rb', line 85 def reload read(Helper.decode(FileSystem.get(ProcessInfo.pid(@pid)))) self end |
#result_key(*args) ⇒ Object
functions to manage the location of the result in the FileSystem
Examples
process.result_key(extra, arguments)
Process.result_key(all, needed, arguments)
# => "something:that:uses:the:extra:arguments"
SomeProcessSubclass.set_result_key("something:ARG:something:VAR")
# sets the result key for (CAPITAL require arguments to fill in the values)
300 301 302 |
# File 'lib/mapredus/process.rb', line 300 def result_key(*args) Helper.class_get(@type).result_key(*[@key_args, args].flatten) end |
#run(data_object, synchronous = false) ⇒ Object
140 141 142 143 144 |
# File 'lib/mapredus/process.rb', line 140 def run( data_object, synchronous = false ) update(:synchronous => synchronous) Master.mapreduce( self, data_object ) true end |
#running? ⇒ Boolean
TODO: Should also have some notion of whether the process is completed or not since the master might not be working, but the process is not yet complete so it is still running
150 151 152 |
# File 'lib/mapredus/process.rb', line 150 def running? Master.working?(@pid) end |
#save ⇒ Object
72 73 74 75 76 |
# File 'lib/mapredus/process.rb', line 72 def save FileSystem.sadd( ProcessInfo.processes, @pid ) FileSystem.save( ProcessInfo.pid(@pid), to_json ) self end |
#to_hash ⇒ Object
61 62 63 64 65 66 |
# File 'lib/mapredus/process.rb', line 61 def to_hash (ATTRS + READERS).inject({}) do |h, attr| h[attr] = send(attr) h end end |
#to_json ⇒ Object
68 69 70 |
# File 'lib/mapredus/process.rb', line 68 def to_json Helper.encode(to_hash) end |
#to_s ⇒ Object
59 |
# File 'lib/mapredus/process.rb', line 59 def to_s; to_json; end |
#update(attrs = {}) ⇒ Object
78 79 80 81 82 83 |
# File 'lib/mapredus/process.rb', line 78 def update(attrs = {}) attrs.each do |attr, val| send("#{attr}=", val) end save end |