Class: AtlasEngine::AddressImporter::OpenAddress::GeoJsonImportJob
- Inherits:
-
ResumableImportJob
- Object
- ActiveJob::Base
- AtlasEngine::ApplicationJob
- ResumableImportJob
- AtlasEngine::AddressImporter::OpenAddress::GeoJsonImportJob
- Extended by:
- T::Sig
- Includes:
- PreparesGeoJsonFile, HandlesInterruption
- Defined in:
- app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb
Constant Summary collapse
- CHUNK_SIZE =
10_000
- REPORT_STEP =
5
- StringProps =
T.type_alias { T::Hash[String, T.untyped] }
- BatchOfRows =
T.type_alias { T::Array[StringProps] }
- FilterType =
T.type_alias { T.proc.params(arg0: StringProps).returns(T::Boolean) }
- Corrector =
AddressImporter::Corrections::Corrector
Constants included from PreparesGeoJsonFile
PreparesGeoJsonFile::ROOT_FOLDER
Constants included from ImportLogHelper
ImportLogHelper::TEST_TIMESTAMP
Instance Attribute Summary collapse
-
#country_code ⇒ Object
readonly
Returns the value of attribute country_code.
-
#country_import ⇒ Object
readonly
Returns the value of attribute country_import.
-
#geojson_path ⇒ Object
readonly
Returns the value of attribute geojson_path.
-
#loader ⇒ Object
readonly
Returns the value of attribute loader.
-
#province_code ⇒ Object
readonly
Returns the value of attribute province_code.
-
#transformer ⇒ Object
readonly
Returns the value of attribute transformer.
Instance Method Summary collapse
- #attributes_from_batch(batch) ⇒ Object
- #build_enumerator(params, cursor:) ⇒ Object
- #condense_addresses(addresses) ⇒ Object
- #corrector ⇒ Object
- #country_profile ⇒ Object
- #each_iteration(batch, element_id) ⇒ Object
- #incr_invalid_lines ⇒ Object
- #invalid_lines ⇒ Object
- #io ⇒ Object
- #row_filter ⇒ Object
-
#setup_and_download(&block) ⇒ Object
Setup boilerplate: JobIteration doesn’t let us override #perform.
- #track_progress(chunk_num) ⇒ Object
Methods included from PreparesGeoJsonFile
Methods included from LogHelper
#log_error, #log_info, #log_warn
Methods inherited from ResumableImportJob
Methods included from ImportLogHelper
#import_log_error, #import_log_info
Methods inherited from AtlasEngine::ApplicationJob
Instance Attribute Details
#country_code ⇒ Object (readonly)
Returns the value of attribute country_code.
27 28 29 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27 def country_code @country_code end |
#country_import ⇒ Object (readonly)
Returns the value of attribute country_import.
27 28 29 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27 def country_import @country_import end |
#geojson_path ⇒ Object (readonly)
Returns the value of attribute geojson_path.
27 28 29 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27 def geojson_path @geojson_path end |
#loader ⇒ Object (readonly)
Returns the value of attribute loader.
27 28 29 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27 def loader @loader end |
#province_code ⇒ Object (readonly)
Returns the value of attribute province_code.
27 28 29 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27 def province_code @province_code end |
#transformer ⇒ Object (readonly)
Returns the value of attribute transformer.
27 28 29 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 27 def transformer @transformer end |
Instance Method Details
#attributes_from_batch(batch) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 125 def attributes_from_batch(batch) batch .filter_map do |feature| attrs = transformer.transform(feature) if attrs.nil? incr_invalid_lines next end attrs end end |
#build_enumerator(params, cursor:) ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 66 def build_enumerator(params, cursor:) start_at = if cursor.nil? import_log_info(country_import: country_import, message: "Importing whole file") 0 else import_log_info(country_import: country_import, message: "Starting import at chunk #{cursor}") cursor.to_i end io.each # NOTE: The bigger the chunk size, the less rountrips to MySQL, and therefore faster. .each_slice(CHUNK_SIZE) .lazy .drop(start_at) # Cursor is chunk number. When resuming, skip that many chunks. .with_index(start_at) # Include skipped chunks in numbering .map do |lines, chunk_num| track_progress(chunk_num) [lines.map { JSON.parse(_1) }, chunk_num] end .map do |features, chunk_num| [features.select(&row_filter), chunk_num] end end |
#condense_addresses(addresses) ⇒ Object
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 108 def condense_addresses(addresses) addresses .group_by { |attrs| [attrs[:province_code], attrs[:locale], attrs[:city], attrs[:street], attrs[:zip]] } .map do |(_province_code, _locale, _city, _street, _zip), matched_addresses| matched_addresses.reduce do |acc, matched_address| acc.merge(matched_address) do |key, oldval, newval| if key == :building_and_unit_ranges oldval.merge(newval) else newval end end end end end |
#corrector ⇒ Object
179 180 181 182 183 184 185 186 187 188 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 179 def corrector @corrector ||= if country_profile.ingestion.correctors(source: "open_address").empty? nil else Corrector.new( country_code: country_code, source: "open_address", ) end end |
#country_profile ⇒ Object
158 159 160 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 158 def country_profile @country_profile ||= CountryProfile.for(country_code) end |
#each_iteration(batch, element_id) ⇒ Object
93 94 95 96 97 98 99 100 101 102 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 93 def each_iteration(batch, element_id) exit_if_interrupted!(country_import) addresses = attributes_from_batch(batch) return if addresses.blank? condensed = condense_addresses(addresses) loader.load(condensed) end |
#incr_invalid_lines ⇒ Object
198 199 200 201 202 203 204 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 198 def incr_invalid_lines if @invalid_lines.nil? @invalid_lines = 0 else @invalid_lines += 1 end end |
#invalid_lines ⇒ Object
207 208 209 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 207 def invalid_lines @invalid_lines || 0 end |
#io ⇒ Object
193 194 195 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 193 def io Zlib::GzipReader.new(geojson_path.open("rb")) end |
#row_filter ⇒ Object
165 166 167 168 169 170 171 172 173 174 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 165 def row_filter @row_filter ||= case country_profile.open_address[:filter] in nil # Undefined: let everything through ->(_row) { true } in /\w+(::\w+)+/ => sym # Class name cls = sym.constantize inst = cls.new(country_import: country_import) inst.method(:filter).to_proc end end |
#setup_and_download(&block) ⇒ Object
Setup boilerplate: JobIteration doesn’t let us override #perform. Instead the around_perform callback is used for that.
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 36 def setup_and_download(&block) @loader = Loader.new @country_code = argument(:country_code) @province_code = argument(:province_code) @geojson_path = Pathname.new(argument(:geojson_file_path)) @locale = argument(:locale)&.downcase @country_import = CountryImport.find(argument(:country_import_id)) @transformer = Transformer.new(country_import: country_import, province_code: @province_code, locale: @locale) import_log_info( country_import: country_import, message: "Downloading geojson file", additional_params: { file_path: geojson_path.to_s }, ) download_geojson(&block) end |
#track_progress(chunk_num) ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'app/jobs/atlas_engine/address_importer/open_address/geo_json_import_job.rb', line 139 def track_progress(chunk_num) return unless chunk_num % REPORT_STEP == 0 lines_parsed = chunk_num * CHUNK_SIZE import_log_info( country_import: country_import, message: "Processing chunk #{chunk_num}, lines parsed so far: #{lines_parsed}", ) if lines_parsed != invalid_lines import_log_info( country_import: country_import, message: "Lines discarded: #{invalid_lines}", category: :invalid_address, ) end end |