Class: SimpleMapReduce::Worker::RunMapTaskWorker
- Inherits:
-
Object
- Object
- SimpleMapReduce::Worker::RunMapTaskWorker
- Defined in:
- lib/simple_map_reduce/worker/run_map_task_worker.rb
Defined Under Namespace
Classes: InvalidMapTaskError
Instance Method Summary collapse
Instance Method Details
#perform(job, map_worker) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/simple_map_reduce/worker/run_map_task_worker.rb', line 8 def perform(job, map_worker) task_wrapper_class_name = "TaskWrapper#{job.id.delete('-')}" self.class.class_eval("class #{task_wrapper_class_name}; end", 'Task Wrapper Class') task_wrapper_class = self.class.const_get(task_wrapper_class_name) task_wrapper_class.class_eval(job.map_script, 'Map task script') map_task = task_wrapper_class.const_get(job.map_class_name, false).new unless map_task.respond_to?(:map) raise InvalidMapTaskError, 'no map method' end logger.info('map task start') local_input_cache = Tempfile.new s3_client.get_object( response_target: local_input_cache.path, bucket: job.job_input_bucket_name, key: job.job_input_directory_path ) local_input_cache.rewind local_output_cache = Tempfile.new local_input_cache.each_line(chomp: true, rs: "\n") do |line| map_task.map(line, local_output_cache) end local_output_cache.rewind logger.debug("output data size: #{local_output_cache.size}") logger.debug('---map output digest---') local_output_cache.take(5).each do |line| logger.debug(line) end logger.debug('---map output digest---') response = http_client(SimpleMapReduce.job_tracker_url).post do |request| request.url('/workers/reserve') # TODO: providing a way to specify worker_size request.body = { worker_size: 2 }.to_json end logger.debug(response.body) # {"succeeded":true,"workers":[{"id":70157882164440,"url":"http://localhost:4569","state":'reserved'}]} reserved_workers = JSON.parse(response.body, symbolize_names: true)[:reserved_workers].map do |worker| SimpleMapReduce::Server::Worker.new( id: worker[:id], url: worker[:url], state: worker[:state].to_sym, data_store_type: 'remote' ) end if reserved_workers.count == 0 # keep working with same worker reserved_workers << map_worker end shuffle(job, reserved_workers, local_output_cache) if reserved_workers.all? { |w| w.id != map_worker.id } begin map_worker.ready! rescue => notify_error logger.fatal(notify_error.inspect) logger.fatal(notify_error.backtrace.take(50)) end end rescue => e logger.error(e.inspect) logger.error(e.backtrace.take(50)) job.failed! # TODO: notifying to job_tracker that this task have failed ensure local_input_cache&.delete local_output_cache&.delete if self.class.const_defined?(task_wrapper_class_name.to_sym) self.class.send(:remove_const, task_wrapper_class_name.to_sym) end logger.info('map task end') end |