Class: PositionSelectionWorker

Inherits:
ScbiMapreduce::Worker
  • Object
show all
Defined in:
lib/anncrsnp/position_selection_worker.rb

Overview

MyWorker defines the behaviour of workers. Here is where the real processing takes place

Instance Method Summary collapse

Instance Method Details

#closing_workerObject

called once, when the worker is about to be closed



77
78
79
# File 'lib/anncrsnp/position_selection_worker.rb', line 77

def closing_worker

end

#get_info_to_search(objs) ⇒ Object

WORKER CUSTOM METHODS



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/anncrsnp/position_selection_worker.rb', line 85

def get_info_to_search(objs) 
  packs = {}
  datas = []
  objs.each do |data, chr, positions| # Analyse which chromosomes and packs must be loaded
     datas << data if !datas.include?(data)
     positions.each do |position|
      pack = position/@options[:index_size]
      pack = pack * @options[:index_size]
      #puts "#{position} ==> #{pack}"
      query_chr = packs[chr]
      if query_chr.nil?
        packs[chr] = { pack => [position]}
      else
        query_pack = query_chr[pack]
        if query_pack.nil?
          query_chr[pack] = [position]
        else
          query_pack << position
        end
      end
     end
  end
  return packs, datas
end

#get_scores(chr_data, positions) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/anncrsnp/position_selection_worker.rb', line 110

def get_scores(chr_data, positions)
  positions_scores = []
  # Remove positions out of existing coordinates
  lower_limit = chr_data.first.first
  upper_limit = chr_data.last.first
  positions_scores.concat(positions.select{|pos| pos < lower_limit}.map{|pos| [pos, 0]}) #At the beginning
  filtered_positions = positions.select{|pos| pos >= lower_limit && pos <= upper_limit }
  #--------------------------------------------------------------------------------------------------
  if !filtered_positions.empty?
    current_position = filtered_positions.shift
    chr_data.each do |coord, score|
      if coord == current_position
        positions_scores << [current_position,  score]
        break if filtered_positions.empty?
        current_position = filtered_positions.shift
      elsif coord > current_position # We have encountered a gap and current position is in it 
        while coord > current_position # drop positions within the gap
          positions_scores << [current_position,  0]
          break if filtered_positions.empty?
          current_position = filtered_positions.shift
        end
        break if filtered_positions.empty?
      end
    end
  end
  
  positions_scores.concat(positions.select{|pos| pos > upper_limit}.map{|pos| [pos, 0]}) # At the end

  return positions_scores
end

#process_object(objs) ⇒ Object

process_object method is called for each received object. Be aware that objs is always an array, and you must iterate over it if you need to process it independently

The value returned here will be received by the work_received method at your worker_manager subclass.



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/anncrsnp/position_selection_worker.rb', line 45

def process_object(objs)
  all_data = nil
  Benchmark.bm do |x|
  x.report('PosS'){

    packs, datas = get_info_to_search(objs) 
    all_data = {}
    datas.each do |data|
      selected_scores = {}
      packs.each do |chr, ps|
        scores = []
        ps.each do |pack, positions|
          info_path = File.join(@options[:preprocessed_data], data, "#{chr}_#{pack}.gz")
          #puts info_path
          if File.exists?(info_path)
            chr_data = []
            Zlib::GzipReader.open(info_path) {|gz| chr_data = JSON.parse(gz.read)}
            scores.concat(get_scores(chr_data, positions)) 
          end
        end
        selected_scores[chr] = scores 
      end
      all_data[data] = selected_scores
    end
    # return objs back to manager

  }
  end
  return all_data
end

#receive_initial_config(parameters) ⇒ Object

receive_initial_config is called only once just after the first connection, when initial parameters are received from manager



27
28
29
30
31
32
33
34
35
36
# File 'lib/anncrsnp/position_selection_worker.rb', line 27

def receive_initial_config(parameters)
  @options = parameters
  # Reads the parameters

  # You can use worker logs at any time in this way:
  $WORKER_LOG.info "Params received"

  # save received parameters, if any
  # @params = parameters
end

#starting_workerObject

starting_worker method is called one time at initialization and allows you to initialize your variables



16
17
18
19
20
21
# File 'lib/anncrsnp/position_selection_worker.rb', line 16

def starting_worker

  # You can use worker logs at any time in this way:
  $WORKER_LOG.info "Starting a worker"

end