Class: Myreplicator::Transporter
- Inherits:
-
Object
- Object
- Myreplicator::Transporter
- Defined in:
- lib/transporter/transporter.rb
Class Method Summary collapse
- .backup_files(location, metadata_path, dump_path) ⇒ Object
-
.completed_files(export) ⇒ Object
Gets all files ready to be exported from server.
-
.download(export) ⇒ Object
Connect to server via SSH Kicks off parallel download.
-
.download_file ⇒ Object
Code block that each thread calls instance_exec is used to execute under Transporter class 1.
-
.get_done_files(export) ⇒ Object
Command for list of done files Grep -s used to supress error messages.
-
.get_dump_path(json_path, metadata = nil) ⇒ Object
Reads metadata file for the export path.
-
.junk_file?(metadata) ⇒ Boolean
Returns true if the file should be deleted.
- .metadata_obj(json_path) ⇒ Object
-
.parallel_download(files) ⇒ Object
Gathers all files that need to be downloaded Gives the queue to parallelizer library to download in parallel.
-
.perform ⇒ Object
Main method provided for resque Reconnection provided for resque workers.
-
.remote_path(export, filename) ⇒ Object
Returns where path of dump files on remote server.
- .remove!(export, json_file, dump_file) ⇒ Object
-
.transfer ⇒ Object
Connects to all unique database servers downloads export files concurrently from multiple sources.
Instance Method Summary collapse
-
#initialize(*args) ⇒ Transporter
constructor
A new instance of Transporter.
-
#schedule(cron) ⇒ Object
Schedules the transport job in Resque.
- #tmp_dir ⇒ Object
Constructor Details
#initialize(*args) ⇒ Transporter
Returns a new instance of Transporter.
8 9 10 |
# File 'lib/transporter/transporter.rb', line 8 def initialize *args = args. end |
Class Method Details
.backup_files(location, metadata_path, dump_path) ⇒ Object
160 161 162 163 |
# File 'lib/transporter/transporter.rb', line 160 def self.backup_files location, , dump_path FileUtils.cp(, location) FileUtils.cp(dump_path, location) end |
.completed_files(export) ⇒ Object
Gets all files ready to be exported from server
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
# File 'lib/transporter/transporter.rb', line 187 def self.completed_files export ssh = export.ssh_to_source done_files = ssh.exec!(get_done_files(export)) if done_files.blank? return [] end files = done_files.split("\n") jobs = Export.where("active = 1 and source_schema = '#{export.source_schema}'") #jobs.each do |j| # j.update_attributes!({:state => "transporting"}) #end result = [] files.each do |file| flag = nil jobs.each do |job| if file.include?(job.table_name) flag = job #job.update_attributes!({:state => 'transporting'}) end end if flag result << {:file => file, :export => flag} end end Kernel.p "===== done_files =====" Kernel.p result return result #Kernel.p "===== done_files =====" #Kernel.p files #return files end |
.download(export) ⇒ Object
Connect to server via SSH Kicks off parallel download
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
# File 'lib/transporter/transporter.rb', line 55 def self.download export #Kernel.p "===== 1 =====" #parallel_download(completed_files(export)) tmp_dir ||= Myreplicator.tmp_path Dir.mkdir(tmp_dir) unless File.directory?(tmp_dir) files = completed_files(export) files.each do |f| export = f[:export] filename = f[:file] ActiveRecord::Base.verify_active_connections! ActiveRecord::Base.connection.reconnect! Log.run(:job_type => "transporter", :name => "metadata_file", :file => filename, :export_id => export.id ) do |log| sftp = export.sftp_to_source json_file = Transporter.remote_path(export, filename) json_local_path = File.join(tmp_dir,filename) puts "Downloading #{json_file}" sftp.download!(json_file, json_local_path) = Transporter.(json_local_path) dump_file = .export_path puts .state if .state == "export_completed" Log.run(:job_type => "transporter", :name => "export_file", :file => dump_file, :export_id => export.id) do |log| puts "Downloading #{dump_file}" local_dump_file = File.join(tmp_dir, dump_file.split("/").last) sftp.download!(dump_file, local_dump_file) Transporter.remove!(export, json_file, dump_file) #export.update_attributes!({:state => 'transport_completed'}) # store back up as well unless .store_in.blank? Transporter.backup_files(.backup_path, json_local_path, local_dump_file) end end else # TO DO: remove metadata file of failed export Transporter.remove!(export, json_file, "") end #if end end end |
.download_file ⇒ Object
Code block that each thread calls instance_exec is used to execute under Transporter class
-
Connects via SFTP
-
Downloads metadata file first
-
Gets dump file location from metadata
-
Downloads dump file
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/transporter/transporter.rb', line 122 def self.download_file proc = Proc.new { |params| export = params[0] filename = params[1] ActiveRecord::Base.verify_active_connections! ActiveRecord::Base.connection.reconnect! Log.run(:job_type => "transporter", :name => "metadata_file", :file => filename, :export_id => export.id ) do |log| sftp = export.sftp_to_source json_file = Transporter.remote_path(export, filename) json_local_path = File.join(tmp_dir,filename) puts "Downloading #{json_file}" sftp.download!(json_file, json_local_path) = Transporter.(json_local_path) dump_file = .export_path puts .state if .state == "export_completed" Log.run(:job_type => "transporter", :name => "export_file", :file => dump_file, :export_id => export.id) do |log| puts "Downloading #{dump_file}" local_dump_file = File.join(tmp_dir, dump_file.split("/").last) sftp.download!(dump_file, local_dump_file) Transporter.remove!(export, json_file, dump_file) #export.update_attributes!({:state => 'transport_completed'}) # store back up as well unless .store_in.blank? Transporter.backup_files(.backup_path, json_local_path, local_dump_file) end end end #if #puts "#{Thread.current.to_s}___Exiting download..." end } end |
.get_done_files(export) ⇒ Object
Command for list of done files Grep -s used to supress error messages
245 246 247 248 249 |
# File 'lib/transporter/transporter.rb', line 245 def self.get_done_files export Kernel.p "===== export =====" Kernel.p export cmd = "cd #{Myreplicator.configs[export.source_schema]["ssh_tmp_dir"]}; grep -ls export_completed *.json" end |
.get_dump_path(json_path, metadata = nil) ⇒ Object
Reads metadata file for the export path
229 230 231 232 |
# File 'lib/transporter/transporter.rb', line 229 def self.get_dump_path json_path, = nil = Transporter.(json_path) if .nil? return .export_path end |
.junk_file?(metadata) ⇒ Boolean
Returns true if the file should be deleted
168 169 170 171 172 173 174 175 176 |
# File 'lib/transporter/transporter.rb', line 168 def self.junk_file? case .state when "failed" return true when "ignored" return true end return false end |
.metadata_obj(json_path) ⇒ Object
221 222 223 224 |
# File 'lib/transporter/transporter.rb', line 221 def self. json_path = ExportMetadata.new(:metadata_path => json_path) return end |
.parallel_download(files) ⇒ Object
Gathers all files that need to be downloaded Gives the queue to parallelizer library to download in parallel
103 104 105 106 107 108 109 110 111 112 |
# File 'lib/transporter/transporter.rb', line 103 def self.parallel_download files p = Parallelizer.new(:klass => "Myreplicator::Transporter") files.each do |f| puts f[:file] p.queue << {:params =>[f[:export], f[:file]], :block => download_file} end p.run end |
.perform ⇒ Object
Main method provided for resque Reconnection provided for resque workers
23 24 25 |
# File 'lib/transporter/transporter.rb', line 23 def self.perform transfer # Kick off the load process end |
.remote_path(export, filename) ⇒ Object
Returns where path of dump files on remote server
237 238 239 |
# File 'lib/transporter/transporter.rb', line 237 def self.remote_path export, filename File.join(Myreplicator.configs[export.source_schema]["ssh_tmp_dir"], filename) end |
.remove!(export, json_file, dump_file) ⇒ Object
178 179 180 181 182 |
# File 'lib/transporter/transporter.rb', line 178 def self.remove! export, json_file, dump_file ssh = export.ssh_to_source puts "rm #{json_file} #{dump_file}" ssh.exec!("rm #{json_file} #{dump_file}") end |
.transfer ⇒ Object
Connects to all unique database servers downloads export files concurrently from multiple sources
42 43 44 45 46 47 48 49 |
# File 'lib/transporter/transporter.rb', line 42 def self.transfer unique_jobs = Export.where("active = 1").group("source_schema") Kernel.p "===== unique_jobs =====" Kernel.p unique_jobs unique_jobs.each do |export| download export end end |
Instance Method Details
#schedule(cron) ⇒ Object
Schedules the transport job in Resque
30 31 32 33 34 35 36 |
# File 'lib/transporter/transporter.rb', line 30 def schedule cron Resque.set_schedule("myreplicator_transporter", { :cron => cron, :class => "Myreplicator::Transporter", :queue => "myreplicator_transporter" }) end |
#tmp_dir ⇒ Object
12 13 14 15 16 17 |
# File 'lib/transporter/transporter.rb', line 12 def tmp_dir #@tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator") @tmp_dir ||= Myreplicator.tmp_path Dir.mkdir(@tmp_dir) unless File.directory?(@tmp_dir) @tmp_dir end |