Class: Myreplicator::Loader

Inherits:
Object
  • Object
show all
Defined in:
lib/loader/loader.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Loader

Returns a new instance of Loader.



8
9
10
# File 'lib/loader/loader.rb', line 8

def initialize *args
  options = args.extract_options!
end

Class Method Details

.cleanup(metadata) ⇒ Object

Deletes the metadata file and extract



281
282
283
284
285
# File 'lib/loader/loader.rb', line 281

def self.cleanup 
  puts "Cleaning up..."
  FileUtils.rm .(tmp_dir) # json file
  FileUtils.rm .destination_filepath(tmp_dir) # dump file
end

.clear_older_files(metadata) ⇒ Object

Clears files that are older than the passed metadata file. Note: This methoded is provided to ensure no old incremental files ever get loaded after the schema change algorithm has been applied



333
334
335
336
337
338
339
340
341
342
343
344
345
346
# File 'lib/loader/loader.rb', line 333

def self.clear_older_files 
  files = Loader.
  #Kernel.p "===== clear old files ====="
  #Kernel.p metadata
  #Kernel.p files
  max_date = DateTime.strptime .export_time
  files.each do |m|
    if .export_id == m.export_id
      if max_date > DateTime.strptime(m.export_time)
        Loader.cleanup m if .filepath != m.filepath
      end 
    end
  end     
end

.group_incrementals(incrementals) ⇒ Object

Groups all incrementals files for the same table together Returns and array of arrays NOTE: Each Arrays should be processed in the same thread to avoid collision



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/loader/loader.rb', line 170

def self.group_incrementals incrementals
  groups = [] # array of all grouped incrementals

  incrementals.each do ||
    group = []
    incrementals.delete()

    # look for same loads
    incrementals.each do |md| 
      if .equals(md)
        group << md
        incrementals.delete(md) # remove from main array
      end
    end
    
    groups << group
  end
  return groups
end

.incremental_load(metadata) ⇒ Object

Loads data incrementally Uses the values specified in the metadatta object



215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/loader/loader.rb', line 215

def self.incremental_load 
  exp = Export.find(.export_id)
  #Loader.unzip(metadata.filename)
  #metadata.zipped = false
  
  options = {:table_name => exp.table_name, 
    :db => exp.destination_schema,
    :filepath => .destination_filepath(tmp_dir), 
    :source_schema => exp.source_schema,      
    :fields_terminated_by => "\\0",
    :lines_terminated_by => "\\n"}
  
  case .export_to 
  when "vertica"
    Loader.load_to_vertica options, , exp
  when "mysql"
    cmd = ImportSql.load_data_infile(options)
    puts cmd
    result = `#{cmd}` # execute
    unless result.nil?
      if result.size > 0
        raise Exceptions::LoaderError.new("Incremental Load #{.filename} Failed!\n#{result}") 
      end
    end
  end #case  
end

.incremental_loads(incrementals) ⇒ Object

Load all incremental files Ensures that multiple loads to the same table happen sequentially.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# File 'lib/loader/loader.rb', line 139

def self.incremental_loads incrementals
  groups = Loader.group_incrementals incrementals
  procs = []
  groups.each do |group|
    procs << Proc.new {
      group.each do ||
        Myreplicator::Log.run(:job_type => "loader", 
                :name => "incremental_import", 
                :file => .filename, 
                :export_id => .export_id) do |log|

          if Myreplicator::Loader.transfer_completed?             
            Myreplicator::Loader.incremental_load 
            Myreplicator::Loader.cleanup 
          end

        end
      end # group
    }
  end # groups
  
  return procs
end

.initial_load(metadata) ⇒ Object

Creates table and loads data



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/loader/loader.rb', line 193

def self.initial_load 
  exp = Export.find(.export_id)
  #Kernel.p "===== unzip ====="
  #Loader.unzip(metadata.filename)
  #metadata.zipped = false
  
  cmd = ImportSql.initial_load(:db => exp.destination_schema,
                               :filepath => .destination_filepath(tmp_dir))      
  puts cmd
  
  result = `#{cmd}` # execute
  unless result.nil?
    if result.size > 0
      raise Exceptions::LoaderError.new("Initial Load #{.filename} Failed!\n#{result}") 
    end
  end
end

.initial_loads(initials) ⇒ Object

Loads all new tables concurrently multiple files



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/loader/loader.rb', line 108

def self.initial_loads initials
  procs = []

  initials.each do || 
    procs << Proc.new {
      Myreplicator::Log.run(:job_type => "loader", 
              :name => "#{.export_type}_import", 
              :file => .filename, 
              :export_id => .export_id) do |log|

        if Myreplicator::Loader.transfer_completed? 
          if .export_to == "vertica"
            Myreplicator::Loader.incremental_load 
          else
            Myreplicator::Loader.initial_load 
          end
          Myreplicator::Loader.cleanup 
        end

      end
    }
  end

  return procs
end

.loadObject

Kicks off all initial loads first and then all incrementals Looks at metadata files stored locally Note: Initials are loaded sequentially



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/loader/loader.rb', line 32

def self.load
  initials = []
  incrementals = []
  all_files = Myreplicator::Loader.
  
  #Kernel.p "===== all_files ====="
  #Kernel.p all_files 
  
  all_files.each do |m|
    #Kernel.p m
    if m.export_type == "initial"
      initials << m # Add initial to the list
      all_files.delete(m) # Delete obj from mixed list

      all_files.each do |md|
        if m.equals(md) && md.export_type == "incremental"
          initials << md # incremental should happen after the initial load
          all_files.delete(md) # remove from current list of files
        end
      end
    end
  end
  
  incrementals = all_files # Remaining are all incrementals
  
  #initial_procs = Loader.initial_loads initials
  #parallel_load initial_procs
  initials.each do ||
    Myreplicator::Log.run(:job_type => "loader", 
    :name => "#{.export_type}_import",
    :file => .filename,
    :export_id => .export_id) do |log|
      if Myreplicator::Loader.transfer_completed? 
        if .export_to == "vertica"
          Myreplicator::Loader.incremental_load 
        else
          Myreplicator::Loader.initial_load 
        end
        Myreplicator::Loader.cleanup 
      end
    end
  end
  
  #incremental_procs = Loader.incremental_loads incrementals
  #parallel_load incremental_procs
  #groups = Myreplicator::Loader.group_incrementals incrementals
  #groups.each do |group|
  incrementals.each do ||
    Myreplicator::Log.run(:job_type => "loader",
    :name => "incremental_import",
    :file => .filename,
    :export_id => .export_id) do |log|
      if Myreplicator::Loader.transfer_completed? 
        Myreplicator::Loader.incremental_load 
        Myreplicator::Loader.cleanup 
      end
    end
  end
   #   end # group
  #end # groups
     
end

.load_to_vertica(options, metadata, exp) ⇒ Object

Load to Vertica



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/loader/loader.rb', line 245

def self.load_to_vertica options, , exp
  options = {:table_name => exp.table_name, 
    :db => ActiveRecord::Base.configurations["vertica"]["database"],
    :filepath => .destination_filepath(tmp_dir), 
    :source_schema => exp.source_schema, :export_id => .export_id,
    :metadata => 
  }
  
  options[:destination_schema] = exp.destination_schema
  
  result = Myreplicator::VerticaLoader.load options
  
  ##TO DO: Handle unsuccessful vertica loads here

end

.metadata_filesObject



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/loader/loader.rb', line 312

def self.
  files = []
  Dir.glob(File.join(tmp_dir, "*.json")).each do |json_file|
    files << Myreplicator::ExportMetadata.new(:metadata_path => json_file)
  end
  result = []
  #Kernel.p files
  files.each do |file|
    job = Export.where("id = #{file.export_id}").first
    #if job.state == "transport_completed"
    result << file
    #end
  end
  return result
end

.mysql_table_definition(options) ⇒ Object



348
349
350
351
352
353
354
355
356
357
358
# File 'lib/loader/loader.rb', line 348

def self.mysql_table_definition options
  sql = "SELECT table_schema, table_name, column_name, is_nullable, data_type, column_type, column_key "
  sql += "FROM INFORMATION_SCHEMA.COLUMNS where table_name = '#{options[:table]}' "
  sql += "and table_schema = '#{options[:source_schema]}';"
  
  puts sql
  
  desc = DB.exec_sql(options[:source_schema], sql)
  puts desc
  return desc
end

.parallel_load(procs) ⇒ Object



95
96
97
98
99
100
101
102
# File 'lib/loader/loader.rb', line 95

def self.parallel_load procs
  p = Parallelizer.new(:klass => "Myreplicator::Loader")
  procs.each do |proc|
    p.queue << {:params => [], :block => proc}
  end
  
  p.run
end

.performObject

Main method provided for resque Reconnection provided for resque workers



21
22
23
24
25
# File 'lib/loader/loader.rb', line 21

def self.perform
  ActiveRecord::Base.verify_active_connections!
  ActiveRecord::Base.connection.reconnect!
  load # Kick off the load process
end

.tmp_dirObject



12
13
14
15
# File 'lib/loader/loader.rb', line 12

def self.tmp_dir
  #@tmp_dir ||= File.join(Myreplicator.app_root,"tmp", "myreplicator")
  @tmp_dir ||= Myreplicator.tmp_path
end

.transfer_completed?(metadata) ⇒ Boolean

Returns true if the transfer of the file being loaded is completed

Returns:

  • (Boolean)


265
266
267
268
269
270
271
272
273
274
275
276
# File 'lib/loader/loader.rb', line 265

def self.transfer_completed? 
  #Kernel.p "===== transfer_completed? metadata ====="
  #Kernel.p ({:export_id => metadata.export_id,
  #                        :file => metadata.export_path,
  #:job_type => "transporter"})
  if Log.completed?(:export_id => .export_id,
                    :file => .export_path,
                    :job_type => "transporter")
    return true
  end
  return false
end

.unzip(filename) ⇒ Object

Unzips file Checks if the file exists or already unzipped



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/loader/loader.rb', line 291

def self.unzip filename
  cmd = "cd #{tmp_dir}; gunzip #{filename}"
  passed = false
  if File.exist?(File.join(tmp_dir,filename))
    result = `#{cmd}`
    unless result.nil? 
      puts result
      unless result.length > 0
        passed = true
      end
    else
      passed = true
    end
  elsif File.exist?(File.join(tmp_dir,filename.gsub(".gz","")))
    puts "File already unzipped"
    passed = true
  end

  raise Exceptions::LoaderError.new("Unzipping #{filename} Failed!") unless passed
end