Class: AtlasEngine::AddressImporter::OpenAddress::GeoJsonImportJob

Inherits:
ResumableImportJob show all
Extended by:
T::Sig
Includes:
PreparesGeoJsonFile, HandlesInterruption
Defined in:
app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb

Constant Summary collapse

CHUNK_SIZE =
10_000
REPORT_STEP =
5
StringProps =
T.type_alias { T::Hash[String, T.untyped] }
BatchOfRows =
T.type_alias { T::Array[StringProps] }
FilterType =
T.type_alias { T.proc.params(arg0: StringProps).returns(T::Boolean) }
Corrector =
AddressImporter::Corrections::Corrector

Constants included from PreparesGeoJsonFile

PreparesGeoJsonFile::ROOT_FOLDER

Constants included from ImportLogHelper

ImportLogHelper::TEST_TIMESTAMP

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from PreparesGeoJsonFile

#download_geojson

Methods included from LogHelper

#log_error, #log_info, #log_warn

Methods inherited from ResumableImportJob

#log_final_stats

Methods included from ImportLogHelper

#import_log_error, #import_log_info

Methods inherited from AtlasEngine::ApplicationJob

#argument

Instance Attribute Details

#country_codeObject (readonly)

Returns the value of attribute country_code.



27
28
29
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27

def country_code
  @country_code
end

#country_importObject (readonly)

Returns the value of attribute country_import.



27
28
29
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27

def country_import
  @country_import
end

#geojson_pathObject (readonly)

Returns the value of attribute geojson_path.



27
28
29
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27

def geojson_path
  @geojson_path
end

#loaderObject (readonly)

Returns the value of attribute loader.



27
28
29
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27

def loader
  @loader
end

#province_codeObject (readonly)

Returns the value of attribute province_code.



27
28
29
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27

def province_code
  @province_code
end

#transformerObject (readonly)

Returns the value of attribute transformer.



27
28
29
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27

def transformer
  @transformer
end

Instance Method Details

#attributes_from_batch(batch) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 125

def attributes_from_batch(batch)
  batch
    .filter_map do |feature|
      attrs = transformer.transform(feature)
      if attrs.nil?
        incr_invalid_lines
        next
      end

      attrs
    end
end

#build_enumerator(params, cursor:) ⇒ Object



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 66

def build_enumerator(params, cursor:)
  start_at = if cursor.nil?
    import_log_info(country_import: country_import, message: "Importing whole file")
    0
  else
    import_log_info(country_import: country_import, message: "Starting import at chunk #{cursor}")
    cursor.to_i
  end

  io.each
    # NOTE: The bigger the chunk size, the less rountrips to MySQL, and therefore faster.
    .each_slice(CHUNK_SIZE)
    .lazy
    .drop(start_at) # Cursor is chunk number. When resuming, skip that many chunks.
    .with_index(start_at) # Include skipped chunks in numbering
    .map do |lines, chunk_num|
      track_progress(chunk_num)
      [lines.map { JSON.parse(_1) }, chunk_num]
    end
    .map do |features, chunk_num|
      [features.select(&row_filter), chunk_num]
    end
end

#condense_addresses(addresses) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 108

def condense_addresses(addresses)
  addresses
    .group_by { |attrs| [attrs[:province_code], attrs[:locale], attrs[:city], attrs[:street], attrs[:zip]] }
    .map do |(_province_code, _locale, _city, _street, _zip), matched_addresses|
      matched_addresses.reduce do |acc, matched_address|
        acc.merge(matched_address) do |key, oldval, newval|
          if key == :building_and_unit_ranges
            oldval.merge(newval)
          else
            newval
          end
        end
      end
    end
end

#correctorObject



179
180
181
182
183
184
185
186
187
188
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 179

def corrector
  @corrector ||= if country_profile.ingestion.correctors(source: "open_address").empty?
    nil
  else
    Corrector.new(
      country_code: country_code,
      source: "open_address",
    )
  end
end

#country_profileObject



158
159
160
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 158

def country_profile
  @country_profile ||= CountryProfile.for(country_code)
end

#each_iteration(batch, element_id) ⇒ Object



93
94
95
96
97
98
99
100
101
102
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 93

def each_iteration(batch, element_id)
  exit_if_interrupted!(country_import)

  addresses = attributes_from_batch(batch)
  return if addresses.blank?

  condensed = condense_addresses(addresses)

  loader.load(condensed)
end

#incr_invalid_linesObject



198
199
200
201
202
203
204
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 198

def incr_invalid_lines
  if @invalid_lines.nil?
    @invalid_lines = 0
  else
    @invalid_lines += 1
  end
end

#invalid_linesObject



207
208
209
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 207

def invalid_lines
  @invalid_lines || 0
end

#ioObject



193
194
195
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 193

def io
  Zlib::GzipReader.new(geojson_path.open("rb"))
end

#row_filterObject



165
166
167
168
169
170
171
172
173
174
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 165

def row_filter
  @row_filter ||= case country_profile.open_address[:filter]
  in nil # Undefined: let everything through
    ->(_row) { true }
  in /\w+(::\w+)+/ => sym # Class name
    cls = sym.constantize
    inst = cls.new(country_import: country_import)
    inst.method(:filter).to_proc
  end
end

#setup_and_download(&block) ⇒ Object

Setup boilerplate: JobIteration doesn’t let us override #perform. Instead the around_perform callback is used for that.



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 36

def setup_and_download(&block)
  @loader = Loader.new
  @country_code = argument(:country_code)
  @province_code = argument(:province_code)
  @geojson_path = Pathname.new(argument(:geojson_file_path))
  @locale = argument(:locale)&.downcase
  @country_import = CountryImport.find(argument(:country_import_id))
  @transformer = Transformer.new(country_import: country_import, province_code: @province_code, locale: @locale)

  import_log_info(
    country_import: country_import,
    message: "Downloading geojson file",
    additional_params: { file_path: geojson_path.to_s },
  )

  download_geojson(&block)
end

#track_progress(chunk_num) ⇒ Object



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 139

def track_progress(chunk_num)
  return unless chunk_num % REPORT_STEP == 0

  lines_parsed = chunk_num * CHUNK_SIZE
  import_log_info(
    country_import: country_import,
    message: "Processing chunk #{chunk_num}, lines parsed so far: #{lines_parsed}",
  )

  if lines_parsed != invalid_lines
    import_log_info(
      country_import: country_import,
      message: "Lines discarded: #{invalid_lines}",
      category: :invalid_address,
    )
  end
end