Class: PreprocessingManager
- Inherits:
-
ScbiMapreduce::WorkManager
- Object
- ScbiMapreduce::WorkManager
- PreprocessingManager
- Defined in:
- lib/anncrsnp/preprocessing_manager.rb
Overview
MyWorkerManager class is used to implement the methods to send and receive the data to or from workers
Class Method Summary collapse
-
.end_work_manager ⇒ Object
end_work_manager is executed at the end, when all the process is done.
-
.init_work_manager(options) ⇒ Object
init_work_manager is executed at the start, prior to any processing.
-
.load_links(file_path) ⇒ Object
CUSTOM ADDITIONAL METHODS.
Instance Method Summary collapse
-
#next_work ⇒ Object
next_work method is called every time a worker needs a new work Here you can read data from disk This method must return the work data or nil if no more data is available.
-
#work_received(results) ⇒ Object
work_received is executed each time a worker has finished a job.
-
#worker_initial_config ⇒ Object
worker_initial_config is used to send initial parameters to workers.
Class Method Details
.end_work_manager ⇒ Object
end_work_manager is executed at the end, when all the process is done. You can use it to close files opened in init_work_manager
34 35 36 |
# File 'lib/anncrsnp/preprocessing_manager.rb', line 34 def self.end_work_manager File.open(File.join(@@options[:preprocessed_data], 'active_data'), 'w'){ |f| f.puts @@features.map{|f| f[1]}.uniq.join("\n")} end |
.init_work_manager(options) ⇒ Object
init_work_manager is executed at the start, prior to any processing. You can use init_work_manager to initialize global variables, open files, etc… Note that an instance of MyWorkerManager will be created for each worker connection, and thus, all global variables here should be class variables (starting with @@)
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/anncrsnp/preprocessing_manager.rb', line 16 def self.init_work_manager() @@options = $LOG.info 'Load genomic features links' @@features = load_links([:file]) $LOG.info "Loaded #{@@features.length} genomic features links" # FEATURE DIRECTORIES @@features.each do |feature| ft_folder = File.join(@@options[:preprocessed_data], feature[1]) #feature name ft_temp_folder = File.join(@@options[:temp], feature[1]) Dir.mkdir(ft_folder) if !Dir.exist?(ft_folder) Dir.mkdir(ft_temp_folder) if !Dir.exist?(ft_temp_folder) end @@processed_features = 0 end |
.load_links(file_path) ⇒ Object
CUSTOM ADDITIONAL METHODS
77 78 79 80 81 82 83 84 85 |
# File 'lib/anncrsnp/preprocessing_manager.rb', line 77 def self.load_links(file_path) features = [] File.open(file_path).each do |line| line.chomp! link, feature, cols, header, format = line.split("\t") features << [link, feature, cols.split(',').map{|col| col.to_i}, header.to_i, format] end return features end |
Instance Method Details
#next_work ⇒ Object
next_work method is called every time a worker needs a new work Here you can read data from disk This method must return the work data or nil if no more data is available
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/anncrsnp/preprocessing_manager.rb', line 47 def next_work begin if @@processed_features >= @@features.length e = nil # worker signal disconect else e = @@features[@@processed_features] end @@processed_features += 1 rescue Exception => e puts e. puts e.backtrace end return e end |
#work_received(results) ⇒ Object
work_received is executed each time a worker has finished a job. Here you can write results down to disk, perform some aggregated statistics, etc…
68 69 70 71 |
# File 'lib/anncrsnp/preprocessing_manager.rb', line 68 def work_received(results) # write_data_to_disk(results) end |
#worker_initial_config ⇒ Object
worker_initial_config is used to send initial parameters to workers. The method is executed once per each worker
40 41 42 |
# File 'lib/anncrsnp/preprocessing_manager.rb', line 40 def worker_initial_config return @@options end |