Class: PreprocessingWorker
- Inherits:
-
ScbiMapreduce::Worker
- Object
- ScbiMapreduce::Worker
- PreprocessingWorker
- Defined in:
- lib/anncrsnp/preprocessing_worker.rb
Overview
MyWorker defines the behaviour of workers. Here is where the real processing takes place
Instance Method Summary collapse
-
#closing_worker ⇒ Object
called once, when the worker is about to be closed.
-
#download_data(link, cols, header, format, temp) ⇒ Object
Download protocols —————————————–.
-
#extract_data(format, temp, folder) ⇒ Object
File decompression methods —————————————–.
- #get_gz(temp, parser) ⇒ Object
- #get_http_data(url, temp) ⇒ Object
-
#process_object(objs) ⇒ Object
process_object method is called for each received object.
-
#receive_initial_config(parameters) ⇒ Object
receive_initial_config is called only once just after the first connection, when initial parameters are received from manager.
-
#starting_worker ⇒ Object
starting_worker method is called one time at initialization and allows you to initialize your variables.
Instance Method Details
#closing_worker ⇒ Object
called once, when the worker is about to be closed
77 78 79 |
# File 'lib/anncrsnp/preprocessing_worker.rb', line 77 def closing_worker end |
#download_data(link, cols, header, format, temp) ⇒ Object
Download protocols
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/anncrsnp/preprocessing_worker.rb', line 87 def download_data(link, cols, header, format, temp) protocol, url = link.split('://') temp_file = nil if protocol == 'http' temp_file = File.join(temp, url.split('/').last) if !File.exist?(temp_file) get_http_data(url, temp_file) $WORKER_LOG.info "Downloading #{link}" else $WORKER_LOG.info "Link was downloaded in a previous execution. Skipping download #{link}" end elsif protocol == 'ftp' else $WORKER_LOG.info "WARNING: protocol: #{protocol} in link: #{link} is not supported" end return temp_file end |
#extract_data(format, temp, folder) ⇒ Object
File decompression methods
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/anncrsnp/preprocessing_worker.rb', line 115 def extract_data(format, temp, folder) data = {} parser_class = FileParser.select(format) parser = parser_class.new(folder, @options[:index_size]) $WORKER_LOG.info "Processing temporal file #{temp}" if temp.include?('.gz') #data = get_gz(temp, parser) get_gz(temp, parser) else end parser.write_compressed_data # Write remaining buffered data $WORKER_LOG.info "End processing temporal file #{temp}" return data end |
#get_gz(temp, parser) ⇒ Object
131 132 133 134 135 136 137 138 |
# File 'lib/anncrsnp/preprocessing_worker.rb', line 131 def get_gz(temp, parser) Zlib::GzipReader.open(temp) {|gz| gz.each do |line| parser.parse(line.chomp) end } #return parser.get_data end |
#get_http_data(url, temp) ⇒ Object
105 106 107 108 109 110 111 |
# File 'lib/anncrsnp/preprocessing_worker.rb', line 105 def get_http_data(url, temp) File.open(temp, "wb") do |saved_file| open("http://#{url}", "rb") do |read_file| saved_file.write(read_file.read) end end end |
#process_object(objs) ⇒ Object
process_object method is called for each received object. Be aware that objs is always an array, and you must iterate over it if you need to process it independently
The value returned here will be received by the work_received method at your worker_manager subclass.
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/anncrsnp/preprocessing_worker.rb', line 49 def process_object(objs) Benchmark.bm do |x| x.report('Prep'){ FileParser.load objs.each do |link, feature, cols, header, format| # iterate over all objects received $WORKER_LOG.info "Processing link: #{feature}, #{format}, #{link}" ft_folder = File.join(@options[:preprocessed_data], feature) ft_temp_folder = File.join(@options[:temp], feature) temp_file = download_data(link, cols, header, format, ft_temp_folder) if !@options[:downloaded_only] if File.exist?(temp_file) extract_data(format, temp_file, ft_folder) else $WORKER_LOG.info "WARNING: Temporal file #{temp_file} have not been downloaded for feature #{feature} so it will be skipped" end else $WORKER_LOG.info "Download only mode, skipping processing temp files" end end } end # return objs back to manager return [] end |
#receive_initial_config(parameters) ⇒ Object
receive_initial_config is called only once just after the first connection, when initial parameters are received from manager
31 32 33 34 35 36 37 38 39 40 |
# File 'lib/anncrsnp/preprocessing_worker.rb', line 31 def receive_initial_config(parameters) @options = parameters # Reads the parameters # You can use worker logs at any time in this way: $WORKER_LOG.info "Params received" # save received parameters, if any # @params = parameters end |
#starting_worker ⇒ Object
starting_worker method is called one time at initialization and allows you to initialize your variables
20 21 22 23 24 25 |
# File 'lib/anncrsnp/preprocessing_worker.rb', line 20 def starting_worker # You can use worker logs at any time in this way: $WORKER_LOG.info "Starting a worker" end |