Class: PositionSelectionManager

Inherits:
ScbiMapreduce::WorkManager
  • Object
show all
Defined in:
lib/anncrsnp/position_selection_manager.rb

Overview

MyWorkerManager class is used to implement the methods to send and receive the data to or from workers

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.create_positions_sets_for_tensorflow(path_folder, scores, tags) ⇒ Object



168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/anncrsnp/position_selection_manager.rb', line 168

def self.create_positions_sets_for_tensorflow(path_folder, scores, tags)
  validation_set_proportion = 0.2
  positions_number = tags.length
  validation_set_length = (positions_number * validation_set_proportion).to_i
  training_set_length = positions_number - validation_set_length
  validation_set_positions = [] # Set which positions will belong to validation set
  while validation_set_positions.length < validation_set_length
    position = rand(positions_number - 1) # We need random 0 based positions
    validation_set_positions << position if !validation_set_positions.include?(position)
  end
  tags.map!{|t| #tensorflow nedd positive integer as tags, we change tag used in AUC operation
    if t == -1
      0
    else
      t
    end
  }
  genomic_features = scores.keys
  training_set = []
  validation_set = []
  tags.each_with_index do |tag, n|
    record = [] # Create record position
    genomic_features.each do |gf|
      record << scores[gf][n]
    end
    record << tag
    if validation_set_positions.include?(n) # Send record to correspondant set
      validation_set << record
    else
      training_set << record
    end
  end
  tag_names = tags.uniq #TODO: improve to ensure exact correspondance
  training_set.unshift([training_set.length, genomic_features.length].concat(tag_names)) # set headers
  validation_set.unshift([validation_set.length, genomic_features.length].concat(tag_names)) # set headers
  write_set(training_set, File.join(path_folder, 'training_set.csv'))
  write_set(validation_set, File.join(path_folder, 'validation_set.csv'))
end

.end_work_managerObject

end_work_manager is executed at the end, when all the process is done. You can use it to close files opened in init_work_manager



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/anncrsnp/position_selection_manager.rb', line 29

def self.end_work_manager
  positions_ids = []
  scores = {} # Create genomic features table
  $LOG.info "Create general scores table"
  @@all_data.each do |data, positions_info|
    data_scores = []
    positions_info.each do |chr, position_info|
      position_info.each do |position, score|
        data_scores << ["#{chr}_#{position.to_s}", score]
      end
    end
    data_scores.sort!{|sc1, sc2| sc1.first <=> sc2.first}
    scores[data] = data_scores.map{|sc| sc.last}
    positions_ids = data_scores.map{|sc| sc.first} if positions_ids.empty?
  end

  if !@@groups.empty?
    tags = positions_ids.map{|id| # Create vector tag group related to scores table
      tag = @@groups[id]
      if tag == 0
        tag = -1 
      else
        tag = 1
      end
    }
    if !@@options[:no_auc]
      $LOG.info "Calculating AUC for each genomic feature"
      aucs = get_aucs(tags, scores) # GEnerate area under curve by each genomic feature 
      File.open(File.join(@@options[:selected_positions_folder], 'AUCs'), 'w'){ |f|
        aucs.each do |data_type, auc|
          f.puts "#{data_type}\t#{auc.join("\t")}"
        end
      }
    end
    $LOG.info "Creating training files for tensorflow"
    create_positions_sets_for_tensorflow(@@options[:selected_positions_folder], scores, tags)
  end

  data_types = scores.keys
  File.open(File.join(@@options[:selected_positions_folder], 'all_data'), 'w'){ |f| #final genomic feature scores table for goldstandard 
    f.puts ['position'].concat(data_types).join("\t")
    positions_ids.each_with_index do |id, i|
      record = [id]
      data_types.each do |dt|
        record << scores[dt][i]
      end
      f.puts record.join("\t")
    end
  }
end

.get_aucs(tags, scores) ⇒ Object



155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/anncrsnp/position_selection_manager.rb', line 155

def self.get_aucs(tags, scores)
  aucs = {}
  scores.each do | data_type, scores|
    matrix = []
    scores.each_with_index do |score, i|
      matrix << [score, tags[i]]
    end
    pts = ROC.curve_points(matrix)
    aucs[data_type] = [ROC.auc(matrix), GChart.scatter(:data => [pts.collect { |x| x[0] }, pts.collect { |x| x[1] }]).to_url] 
  end
  return aucs
end

.init_work_manager(options) ⇒ Object

init_work_manager is executed at the start, prior to any processing. You can use init_work_manager to initialize global variables, open files, etc… Note that an instance of MyWorkerManager will be created for each worker connection, and thus, all global variables here should be class variables (starting with @@)



18
19
20
21
22
23
24
25
# File 'lib/anncrsnp/position_selection_manager.rb', line 18

def self.init_work_manager(options)
  @@options = options
  @@positions, @@groups = load_selected_positions(@@options[:selected_positions])
  @@active_data = File.open(File.join(@@options[:preprocessed_data], 'active_data')).readlines.map {|item| item.chomp}
  @@used_data = 0
  @@used_position = 0
  @@all_data = {}
end

.load_selected_positions(file_path) ⇒ Object

CUSTOM ADDITIONAL METHODS



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/anncrsnp/position_selection_manager.rb', line 130

def self.load_selected_positions(file_path)
  selected_positions = {}
  groups = {}
  File.open(file_path).each do |line|
    line.chomp!
    chr, position, group = line.split("\t")
    record = position.to_i
    if !group.nil?
      group = group.to_i 
      groups["#{chr}_#{position}"] = group
    end
    query = selected_positions[chr]
    if query.nil?
      selected_positions[chr] = [record]
    else
      query << record
      query.uniq!
    end
  end
  selected_positions.each do |chr, positions|
    positions.sort!
  end 
  return selected_positions, groups
end

.write_set(set, path) ⇒ Object



207
208
209
210
211
212
213
# File 'lib/anncrsnp/position_selection_manager.rb', line 207

def self.write_set(set, path)
  File.open(path, 'w'){|f|
    set.each do |record|
      f.puts record.join(',')
    end
  }
end

Instance Method Details

#next_workObject

next_work method is called every time a worker needs a new work Here you can read data from disk This method must return the work data or nil if no more data is available



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/anncrsnp/position_selection_manager.rb', line 89

def next_work
  begin
    if @@used_data >= @@active_data.length
      e = nil # worker signal disconect
    else
      chr = @@positions.keys[@@used_position]
      e = [@@active_data[@@used_data], chr, @@positions[chr]]
      @@used_position += 1
      if @@used_position >= @@positions.length
        @@used_data +=1
        @@used_position = 0
      end
    end

  rescue Exception => e  
    puts e.message  
    puts e.backtrace

  end
  return e

end

#work_received(results) ⇒ Object

work_received is executed each time a worker has finished a job. Here you can write results down to disk, perform some aggregated statistics, etc…



115
116
117
118
119
120
121
122
123
124
# File 'lib/anncrsnp/position_selection_manager.rb', line 115

def work_received(results)
  results.each do |data, positions_info|
    query = @@all_data[data]
    if query.nil?
      @@all_data[data] = positions_info
    else
      @@all_data[data] = query.merge(positions_info)
    end
  end
end

#worker_initial_configObject

worker_initial_config is used to send initial parameters to workers. The method is executed once per each worker



82
83
84
# File 'lib/anncrsnp/position_selection_manager.rb', line 82

def worker_initial_config
  return @@options
end