Class: PositionSelectionManager
- Inherits:
-
ScbiMapreduce::WorkManager
- Object
- ScbiMapreduce::WorkManager
- PositionSelectionManager
- Defined in:
- lib/anncrsnp/position_selection_manager.rb
Overview
MyWorkerManager class is used to implement the methods to send and receive the data to or from workers
Class Method Summary collapse
- .create_positions_sets_for_tensorflow(path_folder, scores, tags) ⇒ Object
-
.end_work_manager ⇒ Object
end_work_manager is executed at the end, when all the process is done.
- .get_aucs(tags, scores) ⇒ Object
-
.init_work_manager(options) ⇒ Object
init_work_manager is executed at the start, prior to any processing.
-
.load_selected_positions(file_path) ⇒ Object
CUSTOM ADDITIONAL METHODS.
- .write_set(set, path) ⇒ Object
Instance Method Summary collapse
-
#next_work ⇒ Object
next_work method is called every time a worker needs a new work Here you can read data from disk This method must return the work data or nil if no more data is available.
-
#work_received(results) ⇒ Object
work_received is executed each time a worker has finished a job.
-
#worker_initial_config ⇒ Object
worker_initial_config is used to send initial parameters to workers.
Class Method Details
.create_positions_sets_for_tensorflow(path_folder, scores, tags) ⇒ Object
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 168 def self.create_positions_sets_for_tensorflow(path_folder, scores, ) validation_set_proportion = 0.2 positions_number = .length validation_set_length = (positions_number * validation_set_proportion).to_i training_set_length = positions_number - validation_set_length validation_set_positions = [] # Set which positions will belong to validation set while validation_set_positions.length < validation_set_length position = rand(positions_number - 1) # We need random 0 based positions validation_set_positions << position if !validation_set_positions.include?(position) end .map!{|t| #tensorflow nedd positive integer as tags, we change tag used in AUC operation if t == -1 0 else t end } genomic_features = scores.keys training_set = [] validation_set = [] .each_with_index do |tag, n| record = [] # Create record position genomic_features.each do |gf| record << scores[gf][n] end record << tag if validation_set_positions.include?(n) # Send record to correspondant set validation_set << record else training_set << record end end tag_names = .uniq #TODO: improve to ensure exact correspondance training_set.unshift([training_set.length, genomic_features.length].concat(tag_names)) # set headers validation_set.unshift([validation_set.length, genomic_features.length].concat(tag_names)) # set headers write_set(training_set, File.join(path_folder, 'training_set.csv')) write_set(validation_set, File.join(path_folder, 'validation_set.csv')) end |
.end_work_manager ⇒ Object
end_work_manager is executed at the end, when all the process is done. You can use it to close files opened in init_work_manager
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 29 def self.end_work_manager positions_ids = [] scores = {} # Create genomic features table $LOG.info "Create general scores table" @@all_data.each do |data, positions_info| data_scores = [] positions_info.each do |chr, position_info| position_info.each do |position, score| data_scores << ["#{chr}_#{position.to_s}", score] end end data_scores.sort!{|sc1, sc2| sc1.first <=> sc2.first} scores[data] = data_scores.map{|sc| sc.last} positions_ids = data_scores.map{|sc| sc.first} if positions_ids.empty? end if !@@groups.empty? = positions_ids.map{|id| # Create vector tag group related to scores table tag = @@groups[id] if tag == 0 tag = -1 else tag = 1 end } if !@@options[:no_auc] $LOG.info "Calculating AUC for each genomic feature" aucs = get_aucs(, scores) # GEnerate area under curve by each genomic feature File.open(File.join(@@options[:selected_positions_folder], 'AUCs'), 'w'){ |f| aucs.each do |data_type, auc| f.puts "#{data_type}\t#{auc.join("\t")}" end } end $LOG.info "Creating training files for tensorflow" create_positions_sets_for_tensorflow(@@options[:selected_positions_folder], scores, ) end data_types = scores.keys File.open(File.join(@@options[:selected_positions_folder], 'all_data'), 'w'){ |f| #final genomic feature scores table for goldstandard f.puts ['position'].concat(data_types).join("\t") positions_ids.each_with_index do |id, i| record = [id] data_types.each do |dt| record << scores[dt][i] end f.puts record.join("\t") end } end |
.get_aucs(tags, scores) ⇒ Object
155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 155 def self.get_aucs(, scores) aucs = {} scores.each do | data_type, scores| matrix = [] scores.each_with_index do |score, i| matrix << [score, [i]] end pts = ROC.curve_points(matrix) aucs[data_type] = [ROC.auc(matrix), GChart.scatter(:data => [pts.collect { |x| x[0] }, pts.collect { |x| x[1] }]).to_url] end return aucs end |
.init_work_manager(options) ⇒ Object
init_work_manager is executed at the start, prior to any processing. You can use init_work_manager to initialize global variables, open files, etc… Note that an instance of MyWorkerManager will be created for each worker connection, and thus, all global variables here should be class variables (starting with @@)
18 19 20 21 22 23 24 25 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 18 def self.init_work_manager() @@options = @@positions, @@groups = load_selected_positions(@@options[:selected_positions]) @@active_data = File.open(File.join(@@options[:preprocessed_data], 'active_data')).readlines.map {|item| item.chomp} @@used_data = 0 @@used_position = 0 @@all_data = {} end |
.load_selected_positions(file_path) ⇒ Object
CUSTOM ADDITIONAL METHODS
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 130 def self.load_selected_positions(file_path) selected_positions = {} groups = {} File.open(file_path).each do |line| line.chomp! chr, position, group = line.split("\t") record = position.to_i if !group.nil? group = group.to_i groups["#{chr}_#{position}"] = group end query = selected_positions[chr] if query.nil? selected_positions[chr] = [record] else query << record query.uniq! end end selected_positions.each do |chr, positions| positions.sort! end return selected_positions, groups end |
.write_set(set, path) ⇒ Object
207 208 209 210 211 212 213 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 207 def self.write_set(set, path) File.open(path, 'w'){|f| set.each do |record| f.puts record.join(',') end } end |
Instance Method Details
#next_work ⇒ Object
next_work method is called every time a worker needs a new work Here you can read data from disk This method must return the work data or nil if no more data is available
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 89 def next_work begin if @@used_data >= @@active_data.length e = nil # worker signal disconect else chr = @@positions.keys[@@used_position] e = [@@active_data[@@used_data], chr, @@positions[chr]] @@used_position += 1 if @@used_position >= @@positions.length @@used_data +=1 @@used_position = 0 end end rescue Exception => e puts e. puts e.backtrace end return e end |
#work_received(results) ⇒ Object
work_received is executed each time a worker has finished a job. Here you can write results down to disk, perform some aggregated statistics, etc…
115 116 117 118 119 120 121 122 123 124 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 115 def work_received(results) results.each do |data, positions_info| query = @@all_data[data] if query.nil? @@all_data[data] = positions_info else @@all_data[data] = query.merge(positions_info) end end end |
#worker_initial_config ⇒ Object
worker_initial_config is used to send initial parameters to workers. The method is executed once per each worker
82 83 84 |
# File 'lib/anncrsnp/position_selection_manager.rb', line 82 def worker_initial_config return @@options end |