Class: PreprocessingManager

Inherits:
ScbiMapreduce::WorkManager
  • Object
show all
Defined in:
lib/anncrsnp/preprocessing_manager.rb

Overview

MyWorkerManager class is used to implement the methods to send and receive the data to or from workers

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.end_work_managerObject

end_work_manager is executed at the end, when all the process is done. You can use it to close files opened in init_work_manager



34
35
36
# File 'lib/anncrsnp/preprocessing_manager.rb', line 34

def self.end_work_manager
  File.open(File.join(@@options[:preprocessed_data], 'active_data'), 'w'){ |f| f.puts @@features.map{|f| f[1]}.uniq.join("\n")}
end

.init_work_manager(options) ⇒ Object

init_work_manager is executed at the start, prior to any processing. You can use init_work_manager to initialize global variables, open files, etc… Note that an instance of MyWorkerManager will be created for each worker connection, and thus, all global variables here should be class variables (starting with @@)



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/anncrsnp/preprocessing_manager.rb', line 16

def self.init_work_manager(options)
  @@options = options
  $LOG.info 'Load genomic features links'
  @@features = load_links(options[:file])
  $LOG.info "Loaded #{@@features.length} genomic features links"

  # FEATURE DIRECTORIES
  @@features.each do |feature|
    ft_folder = File.join(@@options[:preprocessed_data], feature[1]) #feature name
    ft_temp_folder = File.join(@@options[:temp], feature[1])
    Dir.mkdir(ft_folder) if !Dir.exist?(ft_folder)
    Dir.mkdir(ft_temp_folder) if !Dir.exist?(ft_temp_folder)
  end
  @@processed_features = 0
end

CUSTOM ADDITIONAL METHODS



77
78
79
80
81
82
83
84
85
# File 'lib/anncrsnp/preprocessing_manager.rb', line 77

def self.load_links(file_path)
  features = []
  File.open(file_path).each do |line|
    line.chomp!
    link, feature, cols, header, format = line.split("\t")
    features << [link, feature, cols.split(',').map{|col| col.to_i}, header.to_i, format]
  end 
  return features
end

Instance Method Details

#next_workObject

next_work method is called every time a worker needs a new work Here you can read data from disk This method must return the work data or nil if no more data is available



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/anncrsnp/preprocessing_manager.rb', line 47

def next_work
  begin
    if @@processed_features >= @@features.length
      e = nil # worker signal disconect
    else
      e = @@features[@@processed_features]
    end

    @@processed_features += 1
  rescue Exception => e  
    puts e.message  
    puts e.backtrace

  end
  return e

end

#work_received(results) ⇒ Object

work_received is executed each time a worker has finished a job. Here you can write results down to disk, perform some aggregated statistics, etc…



68
69
70
71
# File 'lib/anncrsnp/preprocessing_manager.rb', line 68

def work_received(results)

  # write_data_to_disk(results)
end

#worker_initial_configObject

worker_initial_config is used to send initial parameters to workers. The method is executed once per each worker



40
41
42
# File 'lib/anncrsnp/preprocessing_manager.rb', line 40

def worker_initial_config
  return @@options
end