Class: PositionSelectionWorker
- Inherits:
-
ScbiMapreduce::Worker
- Object
- ScbiMapreduce::Worker
- PositionSelectionWorker
- Defined in:
- lib/anncrsnp/position_selection_worker.rb
Overview
MyWorker defines the behaviour of workers. Here is where the real processing takes place
Instance Method Summary collapse
-
#closing_worker ⇒ Object
called once, when the worker is about to be closed.
-
#get_info_to_search(objs) ⇒ Object
WORKER CUSTOM METHODS.
- #get_scores(chr_data, positions) ⇒ Object
-
#process_object(objs) ⇒ Object
process_object method is called for each received object.
-
#receive_initial_config(parameters) ⇒ Object
receive_initial_config is called only once just after the first connection, when initial parameters are received from manager.
-
#starting_worker ⇒ Object
starting_worker method is called one time at initialization and allows you to initialize your variables.
Instance Method Details
#closing_worker ⇒ Object
called once, when the worker is about to be closed
77 78 79 |
# File 'lib/anncrsnp/position_selection_worker.rb', line 77 def closing_worker end |
#get_info_to_search(objs) ⇒ Object
WORKER CUSTOM METHODS
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/anncrsnp/position_selection_worker.rb', line 85 def get_info_to_search(objs) packs = {} datas = [] objs.each do |data, chr, positions| # Analyse which chromosomes and packs must be loaded datas << data if !datas.include?(data) positions.each do |position| pack = position/@options[:index_size] pack = pack * @options[:index_size] #puts "#{position} ==> #{pack}" query_chr = packs[chr] if query_chr.nil? packs[chr] = { pack => [position]} else query_pack = query_chr[pack] if query_pack.nil? query_chr[pack] = [position] else query_pack << position end end end end return packs, datas end |
#get_scores(chr_data, positions) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# File 'lib/anncrsnp/position_selection_worker.rb', line 110 def get_scores(chr_data, positions) positions_scores = [] # Remove positions out of existing coordinates lower_limit = chr_data.first.first upper_limit = chr_data.last.first positions_scores.concat(positions.select{|pos| pos < lower_limit}.map{|pos| [pos, 0]}) #At the beginning filtered_positions = positions.select{|pos| pos >= lower_limit && pos <= upper_limit } #-------------------------------------------------------------------------------------------------- if !filtered_positions.empty? current_position = filtered_positions.shift chr_data.each do |coord, score| if coord == current_position positions_scores << [current_position, score] break if filtered_positions.empty? current_position = filtered_positions.shift elsif coord > current_position # We have encountered a gap and current position is in it while coord > current_position # drop positions within the gap positions_scores << [current_position, 0] break if filtered_positions.empty? current_position = filtered_positions.shift end break if filtered_positions.empty? end end end positions_scores.concat(positions.select{|pos| pos > upper_limit}.map{|pos| [pos, 0]}) # At the end return positions_scores end |
#process_object(objs) ⇒ Object
process_object method is called for each received object. Be aware that objs is always an array, and you must iterate over it if you need to process it independently
The value returned here will be received by the work_received method at your worker_manager subclass.
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/anncrsnp/position_selection_worker.rb', line 45 def process_object(objs) all_data = nil Benchmark.bm do |x| x.report('PosS'){ packs, datas = get_info_to_search(objs) all_data = {} datas.each do |data| selected_scores = {} packs.each do |chr, ps| scores = [] ps.each do |pack, positions| info_path = File.join(@options[:preprocessed_data], data, "#{chr}_#{pack}.gz") #puts info_path if File.exists?(info_path) chr_data = [] Zlib::GzipReader.open(info_path) {|gz| chr_data = JSON.parse(gz.read)} scores.concat(get_scores(chr_data, positions)) end end selected_scores[chr] = scores end all_data[data] = selected_scores end # return objs back to manager } end return all_data end |
#receive_initial_config(parameters) ⇒ Object
receive_initial_config is called only once just after the first connection, when initial parameters are received from manager
27 28 29 30 31 32 33 34 35 36 |
# File 'lib/anncrsnp/position_selection_worker.rb', line 27 def receive_initial_config(parameters) @options = parameters # Reads the parameters # You can use worker logs at any time in this way: $WORKER_LOG.info "Params received" # save received parameters, if any # @params = parameters end |
#starting_worker ⇒ Object
starting_worker method is called one time at initialization and allows you to initialize your variables
16 17 18 19 20 21 |
# File 'lib/anncrsnp/position_selection_worker.rb', line 16 def starting_worker # You can use worker logs at any time in this way: $WORKER_LOG.info "Starting a worker" end |