Class: Myreplicator::Loader
- Inherits:
-
Object
- Object
- Myreplicator::Loader
- Defined in:
- lib/loader/loader.rb
Class Method Summary collapse
-
.cleanup(metadata) ⇒ Object
Deletes the metadata file and extract.
-
.clear_older_files(metadata) ⇒ Object
Clears files that are older than the passed metadata file.
-
.group_incrementals(incrementals) ⇒ Object
Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision.
-
.incremental_load(metadata) ⇒ Object
Loads data incrementally Uses the values specified in the metadatta object.
-
.incremental_loads(incrementals) ⇒ Object
Load all incremental files Ensures that multiple loads to the same table happen sequentially.
-
.initial_load(metadata) ⇒ Object
Creates table and loads data.
-
.initial_loads(initials) ⇒ Object
Loads all new tables concurrently multiple files.
-
.load ⇒ Object
Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially.
-
.load_to_vertica(options, metadata, exp) ⇒ Object
Load to Vertica.
- .metadata_files ⇒ Object
- .mysql_table_definition(options) ⇒ Object
- .parallel_load(procs) ⇒ Object
-
.perform ⇒ Object
Main method provided for resque Reconnection provided for resque workers.
- .tmp_dir ⇒ Object
-
.transfer_completed?(metadata) ⇒ Boolean
Returns true if the transfer of the file being loaded is completed.
-
.unzip(filename) ⇒ Object
Unzips file Checks if the file exists or already unzipped.
Instance Method Summary collapse
-
#initialize(*args) ⇒ Loader
constructor
A new instance of Loader.
Constructor Details
#initialize(*args) ⇒ Loader
Returns a new instance of Loader.
8 9 10 |
# File 'lib/loader/loader.rb', line 8 def initialize *args = args. end |
Class Method Details
.cleanup(metadata) ⇒ Object
Deletes the metadata file and extract
281 282 283 284 285 |
# File 'lib/loader/loader.rb', line 281 def self.cleanup puts "Cleaning up..." FileUtils.rm .(tmp_dir) # json file FileUtils.rm .destination_filepath(tmp_dir) # dump file end |
.clear_older_files(metadata) ⇒ Object
Clears files that are older than the passed metadata file. Note: This methoded is provided to ensure no old incremental files ever get loaded after the schema change algorithm has been applied
333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
# File 'lib/loader/loader.rb', line 333 def self.clear_older_files files = Loader. #Kernel.p "===== clear old files =====" #Kernel.p metadata #Kernel.p files max_date = DateTime.strptime .export_time files.each do |m| if .export_id == m.export_id if max_date > DateTime.strptime(m.export_time) Loader.cleanup m if .filepath != m.filepath end end end end |
.group_incrementals(incrementals) ⇒ Object
Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/loader/loader.rb', line 170 def self.group_incrementals incrementals groups = [] # array of all grouped incrementals incrementals.each do || group = [] incrementals.delete() # look for same loads incrementals.each do |md| if .equals(md) group << md incrementals.delete(md) # remove from main array end end groups << group end return groups end |
.incremental_load(metadata) ⇒ Object
Loads data incrementally Uses the values specified in the metadatta object
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/loader/loader.rb', line 215 def self.incremental_load exp = Export.find(.export_id) #Loader.unzip(metadata.filename) #metadata.zipped = false = {:table_name => exp.table_name, :db => exp.destination_schema, :filepath => .destination_filepath(tmp_dir), :source_schema => exp.source_schema, :fields_terminated_by => "\\0", :lines_terminated_by => "\\n"} case .export_to when "vertica" Loader.load_to_vertica , , exp when "mysql" cmd = ImportSql.load_data_infile() puts cmd result = `#{cmd}` # execute unless result.nil? if result.size > 0 raise Exceptions::LoaderError.new("Incremental Load #{.filename} Failed!\n#{result}") end end end #case end |
.incremental_loads(incrementals) ⇒ Object
Load all incremental files Ensures that multiple loads to the same table happen sequentially.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# File 'lib/loader/loader.rb', line 139 def self.incremental_loads incrementals groups = Loader.group_incrementals incrementals procs = [] groups.each do |group| procs << Proc.new { group.each do || Myreplicator::Log.run(:job_type => "loader", :name => "incremental_import", :file => .filename, :export_id => .export_id) do |log| if Myreplicator::Loader.transfer_completed? Myreplicator::Loader.incremental_load Myreplicator::Loader.cleanup end end end # group } end # groups return procs end |
.initial_load(metadata) ⇒ Object
Creates table and loads data
193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/loader/loader.rb', line 193 def self.initial_load exp = Export.find(.export_id) #Kernel.p "===== unzip =====" #Loader.unzip(metadata.filename) #metadata.zipped = false cmd = ImportSql.initial_load(:db => exp.destination_schema, :filepath => .destination_filepath(tmp_dir)) puts cmd result = `#{cmd}` # execute unless result.nil? if result.size > 0 raise Exceptions::LoaderError.new("Initial Load #{.filename} Failed!\n#{result}") end end end |
.initial_loads(initials) ⇒ Object
Loads all new tables concurrently multiple files
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/loader/loader.rb', line 108 def self.initial_loads initials procs = [] initials.each do || procs << Proc.new { Myreplicator::Log.run(:job_type => "loader", :name => "#{.export_type}_import", :file => .filename, :export_id => .export_id) do |log| if Myreplicator::Loader.transfer_completed? if .export_to == "vertica" Myreplicator::Loader.incremental_load else Myreplicator::Loader.initial_load end Myreplicator::Loader.cleanup end end } end return procs end |
.load ⇒ Object
Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/loader/loader.rb', line 32 def self.load initials = [] incrementals = [] all_files = Myreplicator::Loader. #Kernel.p "===== all_files =====" #Kernel.p all_files all_files.each do |m| #Kernel.p m if m.export_type == "initial" initials << m # Add initial to the list all_files.delete(m) # Delete obj from mixed list all_files.each do |md| if m.equals(md) && md.export_type == "incremental" initials << md # incremental should happen after the initial load all_files.delete(md) # remove from current list of files end end end end incrementals = all_files # Remaining are all incrementals #initial_procs = Loader.initial_loads initials #parallel_load initial_procs initials.each do || Myreplicator::Log.run(:job_type => "loader", :name => "#{.export_type}_import", :file => .filename, :export_id => .export_id) do |log| if Myreplicator::Loader.transfer_completed? if .export_to == "vertica" Myreplicator::Loader.incremental_load else Myreplicator::Loader.initial_load end Myreplicator::Loader.cleanup end end end #incremental_procs = Loader.incremental_loads incrementals #parallel_load incremental_procs #groups = Myreplicator::Loader.group_incrementals incrementals #groups.each do |group| incrementals.each do || Myreplicator::Log.run(:job_type => "loader", :name => "incremental_import", :file => .filename, :export_id => .export_id) do |log| if Myreplicator::Loader.transfer_completed? Myreplicator::Loader.incremental_load Myreplicator::Loader.cleanup end end end # end # group #end # groups end |
.load_to_vertica(options, metadata, exp) ⇒ Object
Load to Vertica
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 |
# File 'lib/loader/loader.rb', line 245 def self.load_to_vertica , , exp = {:table_name => exp.table_name, :db => ActiveRecord::Base.configurations["vertica"]["database"], :filepath => .destination_filepath(tmp_dir), :source_schema => exp.source_schema, :export_id => .export_id, :metadata => } [:destination_schema] = exp.destination_schema result = Myreplicator::VerticaLoader.load ##TO DO: Handle unsuccessful vertica loads here end |
.metadata_files ⇒ Object
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
# File 'lib/loader/loader.rb', line 312 def self. files = [] Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file| files << Myreplicator::ExportMetadata.new(:metadata_path => json_file) end result = [] #Kernel.p files files.each do |file| job = Export.where("id = #{file.export_id}").first #if job.state == "transport_completed" result << file #end end return result end |
.mysql_table_definition(options) ⇒ Object
348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/loader/loader.rb', line 348 def self.mysql_table_definition sql = "SELECT table_schema, table_name, column_name, is_nullable, data_type, column_type, column_key " sql += "FROM INFORMATION_SCHEMA.COLUMNS where table_name = '#{[:table]}' " sql += "and table_schema = '#{[:source_schema]}';" puts sql desc = DB.exec_sql([:source_schema], sql) puts desc return desc end |
.parallel_load(procs) ⇒ Object
95 96 97 98 99 100 101 102 |
# File 'lib/loader/loader.rb', line 95 def self.parallel_load procs p = Parallelizer.new(:klass => "Myreplicator::Loader") procs.each do |proc| p.queue << {:params => [], :block => proc} end p.run end |
.perform ⇒ Object
Main method provided for resque Reconnection provided for resque workers
21 22 23 24 25 |
# File 'lib/loader/loader.rb', line 21 def self.perform ActiveRecord::Base.verify_active_connections! ActiveRecord::Base.connection.reconnect! load # Kick off the load process end |
.tmp_dir ⇒ Object
12 13 14 15 |
# File 'lib/loader/loader.rb', line 12 def self.tmp_dir #@tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator") @tmp_dir ||= Myreplicator.tmp_path end |
.transfer_completed?(metadata) ⇒ Boolean
Returns true if the transfer of the file being loaded is completed
265 266 267 268 269 270 271 272 273 274 275 276 |
# File 'lib/loader/loader.rb', line 265 def self.transfer_completed? #Kernel.p "===== transfer_completed? metadata =====" #Kernel.p ({:export_id => metadata.export_id, # :file => metadata.export_path, #:job_type => "transporter"}) if Log.completed?(:export_id => .export_id, :file => .export_path, :job_type => "transporter") return true end return false end |
.unzip(filename) ⇒ Object
Unzips file Checks if the file exists or already unzipped
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/loader/loader.rb', line 291 def self.unzip filename cmd = "cd #{tmp_dir}; gunzip #{filename}" passed = false if File.exist?(File.join(tmp_dir,filename)) result = `#{cmd}` unless result.nil? puts result unless result.length > 0 passed = true end else passed = true end elsif File.exist?(File.join(tmp_dir,filename.gsub(".gz",""))) puts "File already unzipped" passed = true end raise Exceptions::LoaderError.new("Unzipping #{filename} Failed!") unless passed end |