Module: MiGA::Cli::Action::Doctor::Distances

Included in:
MiGA::Cli::Action::Doctor
Defined in:
lib/miga/cli/action/doctor/distances.rb

Instance Method Summary collapse

Instance Method Details

#check_bidir(cli) ⇒ Object

Perform bidirectional operation with MiGA::Cli cli



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/miga/cli/action/doctor/distances.rb', line 37

def check_bidir(cli)
  cli.say 'Checking if reference distances are bidirectional'
  project = cli.load_project
  ref_ds = project.each_dataset.select(&:ref?)

  # Read and merge data
  tmp = partial_bidir_tmp(project, ref_ds)
  dist = merge_bidir_tmp(tmp)
  FileUtils.rm_rf(tmp)

  # Write missing values (threaded)
  MiGA::Parallel.distribute(ref_ds, cli[:threads]) do |ds, idx, thr|
    cli.advance('Datasets:', idx + 1, ref_ds.size, false) if thr == 0
    save_bidirectional(ds, dist)
  end
  cli.say
end

#check_db(cli) ⇒ Object

Perform databases operation with MiGA::Cli cli



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/miga/cli/action/doctor/distances.rb', line 5

def check_db(cli)
  cli.say 'Checking integrity of databases'
  p = cli.load_project
  n = p.dataset_names.size
  (0 .. cli[:threads] - 1).map do |i|
    Process.fork do
      k = 0
      p.each_dataset do |d|
        k += 1
        cli.advance('Datasets:', k, n, false) if i == 0
        next unless k % cli[:threads] == i
        each_database_file(d) do |db_file, metric, result, _rank|
          check_sqlite3_database(db_file, metric) do
            cli.say(
              "  > Removing malformed database from #{d.name}:#{result}   "
            )
            File.unlink(db_file)
            r = d.result(result) or next
            [r.path(:done), r.path].each do |f|
              File.unlink(f) if File.exist?(f)
            end
          end
        end
      end
    end
  end
  Process.waitall
  cli.say
end

#check_dist(cli) ⇒ Object

Perform distances operation with MiGA::Cli cli



57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/miga/cli/action/doctor/distances.rb', line 57

def check_dist(cli)
  p = cli.load_project
  %i[ani aai].each do |dist|
    res = p.result("#{dist}_distances")
    next if res.nil?

    cli.say "Checking #{dist} table for consistent datasets"
    notok, fix = check_dist_eval(cli, p, res)
    check_dist_fix(cli, p, fix)
    check_dist_recompute(cli, res, notok)
  end
end

#merge_bidir_tmp(tmp) ⇒ Object

Read partial temporal reports of bidirectionality (located in tmp), and return a two-deep hash with the final missingness report by metric (first key) and dataset name (second key). Used by check_bidir



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/miga/cli/action/doctor/distances.rb', line 112

def merge_bidir_tmp(tmp)
  dist = { aai: {}, ani: {} }
  cli[:threads].times do |i|
    cli.advance('Merging:', i + 1, cli[:threads], false)
    file = File.join(tmp, "#{i}.json")
    File.open(file, 'r') do |fh|
      metric = nil
      fh.each do |ln|
        qry, row = ln.chomp.split(' ', 2)
        row or raise "Unexpected format in #{file}:#{$.}"
        if qry == '#'
          metric = row.to_sym
        else
          raise "Unrecognized metric: #{metric}" unless dist[metric]
          JSON.parse(row).each do |sbj, val|
            dist[metric][qry] ||= {}
            if dist[metric][sbj]&.include?(qry)
              dist[metric][sbj].delete(qry) # Already bidirectional
            else
              dist[metric][qry][sbj] = val
            end
          end
        end
      end
      raise "Incomplete thread dump: #{file}" unless metric == :end
    end
  end
  cli.say

  return dist
end

#partial_bidir_tmp(project, ref_ds) ⇒ Object

Make a temporal directory holding partial bidirectionality reports (one per thread) in a custom multi-JSON format. Requires a MiGA::Project project and the iterator of the reference datasets ref_ds. Returns the path to the temporal directory created. Used by check_bidir



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/miga/cli/action/doctor/distances.rb', line 77

def partial_bidir_tmp(project, ref_ds)
  n = ref_ds.size

  # Read data first (threaded)
  tmp = File.join(project.path, 'doctor-bidirectional.tmp')
  FileUtils.mkdir_p(tmp)
  MiGA::Parallel.process(cli[:threads]) do |thr|
    file = File.join(tmp, "#{thr}.json")
    fh = File.open(file, 'w')
    [:aai, :ani].each do |metric|
      fh.puts "# #{metric}"
      ref_ds.each_with_index do |ds, idx|
        if idx % cli[:threads] == thr
          cli.advance('Reading:', idx + 1, n, false) if thr == 0
          row = read_bidirectional(ds, metric)
          fh.puts "#{ds.name} #{JSON.fast_generate(row)}" unless row.empty?
        end
      end
    end
    fh.puts '# end'
    fh.flush # necessary for large threaded runs
    fh.close
    if thr == 0
      cli.advance('Reading:', n, n, false)
      cli.say
    end
  end

  return tmp
end