Class: Myreplicator::Transporter

Inherits:
Object
  • Object
show all
Defined in:
lib/transporter/transporter.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Transporter

Returns a new instance of Transporter.



8
9
10
# File 'lib/transporter/transporter.rb', line 8

def initialize *args
  options = args.extract_options!
end

Class Method Details

.backup_files(location, metadata_path, dump_path) ⇒ Object



160
161
162
163
# File 'lib/transporter/transporter.rb', line 160

def self.backup_files location, , dump_path
  FileUtils.cp(, location)
  FileUtils.cp(dump_path, location)
end

.completed_files(export) ⇒ Object

Gets all files ready to be exported from server



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/transporter/transporter.rb', line 187

def self.completed_files export
  ssh = export.ssh_to_source
  done_files = ssh.exec!(get_done_files(export))
  if done_files.blank?
    return []
  end
  files = done_files.split("\n")
  
  jobs = Export.where("active = 1 and source_schema = '#{export.source_schema}'")
  #jobs.each do |j|
  #  j.update_attributes!({:state => "transporting"})
  #end
  result = []
  files.each do |file|
    flag = nil
    jobs.each do |job|
      if file.include?(job.table_name)
        flag = job 
        #job.update_attributes!({:state => 'transporting'})
      end
    end
    if flag
      result << {:file => file, :export => flag}
    end
  end
  Kernel.p "===== done_files ====="
  Kernel.p result
  return result

  #Kernel.p "===== done_files ====="
  #Kernel.p files
  #return files
end

.download(export) ⇒ Object

Connect to server via SSH Kicks off parallel download



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/transporter/transporter.rb', line 55

def self.download export
  #Kernel.p "===== 1 ====="
  #parallel_download(completed_files(export))
  tmp_dir ||= Myreplicator.tmp_path
  Dir.mkdir(tmp_dir) unless File.directory?(tmp_dir)
  files = completed_files(export)
  files.each do |f|
    export = f[:export]
    filename = f[:file]
    ActiveRecord::Base.verify_active_connections!
         ActiveRecord::Base.connection.reconnect!
 
         Log.run(:job_type => "transporter", :name => "metadata_file", 
                 :file => filename, :export_id => export.id ) do |log|
 
           sftp = export.sftp_to_source
           json_file = Transporter.remote_path(export, filename) 
           json_local_path = File.join(tmp_dir,filename)
           puts "Downloading #{json_file}"
           sftp.download!(json_file, json_local_path)
            = Transporter.(json_local_path)
           dump_file = .export_path
           puts .state
           if .state == "export_completed"
             Log.run(:job_type => "transporter", :name => "export_file",
                     :file => dump_file, :export_id => export.id) do |log|
               puts "Downloading #{dump_file}"
               local_dump_file = File.join(tmp_dir, dump_file.split("/").last)
               sftp.download!(dump_file, local_dump_file)
               Transporter.remove!(export, json_file, dump_file)
               #export.update_attributes!({:state => 'transport_completed'})
               # store back up as well
               unless .store_in.blank?
                 Transporter.backup_files(.backup_path, json_local_path, local_dump_file)
               end
             end
           else
             # TO DO: remove metadata file of failed export
             Transporter.remove!(export, json_file, "")
           end #if
         end
  end
end

.download_fileObject

Code block that each thread calls instance_exec is used to execute under Transporter class

  1. Connects via SFTP

  2. Downloads metadata file first

  3. Gets dump file location from metadata

  4. Downloads dump file



122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/transporter/transporter.rb', line 122

def self.download_file    
  proc = Proc.new { |params|
    export = params[0] 
    filename = params[1]

    ActiveRecord::Base.verify_active_connections!
    ActiveRecord::Base.connection.reconnect!

    Log.run(:job_type => "transporter", :name => "metadata_file", 
            :file => filename, :export_id => export.id ) do |log|

      sftp = export.sftp_to_source
      json_file = Transporter.remote_path(export, filename) 
      json_local_path = File.join(tmp_dir,filename)
      puts "Downloading #{json_file}"
      sftp.download!(json_file, json_local_path)
       = Transporter.(json_local_path)
      dump_file = .export_path
      puts .state
      if .state == "export_completed"
        Log.run(:job_type => "transporter", :name => "export_file",
                :file => dump_file, :export_id => export.id) do |log|
          puts "Downloading #{dump_file}"
          local_dump_file = File.join(tmp_dir, dump_file.split("/").last)
          sftp.download!(dump_file, local_dump_file)
          Transporter.remove!(export, json_file, dump_file)
          #export.update_attributes!({:state => 'transport_completed'})
          # store back up as well
          unless .store_in.blank?
            Transporter.backup_files(.backup_path, json_local_path, local_dump_file)
          end
        end
      end #if
      #puts "#{Thread.current.to_s}___Exiting download..."
    end
  }
end

.get_done_files(export) ⇒ Object

Command for list of done files Grep -s used to supress error messages



245
246
247
248
249
# File 'lib/transporter/transporter.rb', line 245

def self.get_done_files export
  Kernel.p "===== export ====="
  Kernel.p export
  cmd = "cd #{Myreplicator.configs[export.source_schema]["ssh_tmp_dir"]}; grep -ls export_completed *.json"
end

.get_dump_path(json_path, metadata = nil) ⇒ Object

Reads metadata file for the export path



229
230
231
232
# File 'lib/transporter/transporter.rb', line 229

def self.get_dump_path json_path,  = nil
   = Transporter.(json_path) if .nil?
  return .export_path
end

.junk_file?(metadata) ⇒ Boolean

Returns true if the file should be deleted

Returns:

  • (Boolean)


168
169
170
171
172
173
174
175
176
# File 'lib/transporter/transporter.rb', line 168

def self.junk_file? 
  case .state
  when "failed"
    return true
  when "ignored"
    return true
  end
  return false
end

.metadata_obj(json_path) ⇒ Object



221
222
223
224
# File 'lib/transporter/transporter.rb', line 221

def self. json_path
   = ExportMetadata.new(:metadata_path => json_path)
  return 
end

.parallel_download(files) ⇒ Object

Gathers all files that need to be downloaded Gives the queue to parallelizer library to download in parallel



103
104
105
106
107
108
109
110
111
112
# File 'lib/transporter/transporter.rb', line 103

def self.parallel_download files    
  p = Parallelizer.new(:klass => "Myreplicator::Transporter")

  files.each do |f|
    puts f[:file]
    p.queue << {:params =>[f[:export], f[:file]], :block => download_file}
  end

  p.run 
end

.performObject

Main method provided for resque Reconnection provided for resque workers



23
24
25
# File 'lib/transporter/transporter.rb', line 23

def self.perform
  transfer # Kick off the load process
end

.remote_path(export, filename) ⇒ Object

Returns where path of dump files on remote server



237
238
239
# File 'lib/transporter/transporter.rb', line 237

def self.remote_path export, filename
  File.join(Myreplicator.configs[export.source_schema]["ssh_tmp_dir"], filename)
end

.remove!(export, json_file, dump_file) ⇒ Object



178
179
180
181
182
# File 'lib/transporter/transporter.rb', line 178

def self.remove! export, json_file, dump_file
  ssh = export.ssh_to_source
  puts "rm #{json_file} #{dump_file}"
  ssh.exec!("rm #{json_file} #{dump_file}")
end

.transferObject

Connects to all unique database servers downloads export files concurrently from multiple sources



42
43
44
45
46
47
48
49
# File 'lib/transporter/transporter.rb', line 42

def self.transfer
  unique_jobs = Export.where("active = 1").group("source_schema")
  Kernel.p "===== unique_jobs ====="
  Kernel.p unique_jobs 
  unique_jobs.each do |export|
    download export
  end
end

Instance Method Details

#schedule(cron) ⇒ Object

Schedules the transport job in Resque



30
31
32
33
34
35
36
# File 'lib/transporter/transporter.rb', line 30

def schedule cron
  Resque.set_schedule("myreplicator_transporter", {
                        :cron => cron,
                        :class => "Myreplicator::Transporter",
                        :queue => "myreplicator_transporter"
                      })
end

#tmp_dirObject



12
13
14
15
16
17
# File 'lib/transporter/transporter.rb', line 12

def tmp_dir
  #@tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator")
  @tmp_dir ||= Myreplicator.tmp_path
  Dir.mkdir(@tmp_dir) unless File.directory?(@tmp_dir)
  @tmp_dir
end