Class: PreprocessingWorker

Inherits:
ScbiMapreduce::Worker
  • Object
show all
Defined in:
lib/anncrsnp/preprocessing_worker.rb

Overview

MyWorker defines the behaviour of workers. Here is where the real processing takes place

Instance Method Summary collapse

Instance Method Details

#closing_workerObject

called once, when the worker is about to be closed



77
78
79
# File 'lib/anncrsnp/preprocessing_worker.rb', line 77

def closing_worker

end

#download_data(link, cols, header, format, temp) ⇒ Object

Download protocols




87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/anncrsnp/preprocessing_worker.rb', line 87

def download_data(link, cols, header, format, temp)
  protocol, url = link.split('://')
  temp_file = nil
  if protocol == 'http'
    temp_file = File.join(temp, url.split('/').last)
    if !File.exist?(temp_file)
      get_http_data(url, temp_file) 
      $WORKER_LOG.info "Downloading #{link}"
    else
      $WORKER_LOG.info "Link was downloaded in a previous execution. Skipping download #{link}"
    end
  elsif protocol == 'ftp'
  else
    $WORKER_LOG.info "WARNING: protocol: #{protocol} in link: #{link} is not supported"
  end
  return temp_file
end

#extract_data(format, temp, folder) ⇒ Object

File decompression methods




115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/anncrsnp/preprocessing_worker.rb', line 115

def extract_data(format, temp, folder)
  data = {}
  parser_class = FileParser.select(format)
  parser = parser_class.new(folder, @options[:index_size])
  $WORKER_LOG.info "Processing temporal file #{temp}"
  if temp.include?('.gz')
    #data = get_gz(temp, parser) 
    get_gz(temp, parser) 
  else

  end
  parser.write_compressed_data # Write remaining buffered data
  $WORKER_LOG.info "End processing temporal file #{temp}"
  return data
end

#get_gz(temp, parser) ⇒ Object



131
132
133
134
135
136
137
138
# File 'lib/anncrsnp/preprocessing_worker.rb', line 131

def get_gz(temp, parser)
  Zlib::GzipReader.open(temp) {|gz|
      gz.each do |line|
        parser.parse(line.chomp)
      end
  }
  #return parser.get_data
end

#get_http_data(url, temp) ⇒ Object



105
106
107
108
109
110
111
# File 'lib/anncrsnp/preprocessing_worker.rb', line 105

def get_http_data(url, temp)
  File.open(temp, "wb") do |saved_file|
    open("http://#{url}", "rb") do |read_file|
     saved_file.write(read_file.read)
    end
  end
end

#process_object(objs) ⇒ Object

process_object method is called for each received object. Be aware that objs is always an array, and you must iterate over it if you need to process it independently

The value returned here will be received by the work_received method at your worker_manager subclass.



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/anncrsnp/preprocessing_worker.rb', line 49

def process_object(objs)
  Benchmark.bm do |x|
  x.report('Prep'){
    
    FileParser.load
    objs.each do |link, feature, cols, header, format| # iterate over all objects received
      $WORKER_LOG.info "Processing link: #{feature}, #{format}, #{link}"
      ft_folder = File.join(@options[:preprocessed_data], feature)
      ft_temp_folder = File.join(@options[:temp], feature)
      temp_file = download_data(link, cols, header, format, ft_temp_folder)
      if !@options[:downloaded_only]
        if File.exist?(temp_file)
          extract_data(format, temp_file, ft_folder)
        else
          $WORKER_LOG.info "WARNING: Temporal file #{temp_file} have not been downloaded for feature #{feature} so it will be skipped"
        end
      else
        $WORKER_LOG.info "Download only mode, skipping processing temp files"
      end
    end

  }
  end
  # return objs back to manager
  return []
end

#receive_initial_config(parameters) ⇒ Object

receive_initial_config is called only once just after the first connection, when initial parameters are received from manager



31
32
33
34
35
36
37
38
39
40
# File 'lib/anncrsnp/preprocessing_worker.rb', line 31

def receive_initial_config(parameters)
  @options = parameters
  # Reads the parameters

  # You can use worker logs at any time in this way:
  $WORKER_LOG.info "Params received"

  # save received parameters, if any
  # @params = parameters
end

#starting_workerObject

starting_worker method is called one time at initialization and allows you to initialize your variables



20
21
22
23
24
25
# File 'lib/anncrsnp/preprocessing_worker.rb', line 20

def starting_worker

  # You can use worker logs at any time in this way:
  $WORKER_LOG.info "Starting a worker"

end