26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
# File 'lib/mapredus/support.rb', line 26
def mapreduce_process( process_name, mapredus_process_class, result_store )
runner_self = Runner
class_name = self.to_s.gsub(/\W/,"_")
global_process_name = "#{class_name}_#{process_name.to_s}"
if runner_self.methods.include?(global_process_name)
raise DuplicateProcessDefintionError
end
mapredus_process_class.set_result_key( result_store )
runner_self.send( :define_method, global_process_name ) do |data, key_arguments|
process = mapredus_process_class.create
process.update(:key_args => key_arguments)
process.run(data)
process
end
runner_self.send( :define_method, "#{global_process_name}_result" ) do |key_arguments, *outputter_args|
key = mapredus_process_class.result_key( *key_arguments )
mapredus_process_class.outputter.decode( key, *outputter_args)
end
end
|