Class: Myreplicator::VerticaLoader

Inherits:
Object
  • Object
show all
Defined in:
lib/loader/vertica/vertica_loader.rb

Class Method Summary collapse

Class Method Details

.apply_schema_change(options, temp_table) ⇒ Object

Schema Change Algorithm Create temp table to load all data Load data Drop table Rename table

rasing a concern: using the same schema or the tmp schema for the tmp table? Vertica doesn’t lock the schema



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/loader/vertica/vertica_loader.rb', line 37

def apply_schema_change options, temp_table
  Kernel.p "+++++++++++++++++ options "
  puts options
  VerticaLoader.create_table({:mysql_schema => options[:mysql_schema],
                               :vertica_db => options[:vertica_db], 
                               :vertica_schema => options[:vertica_schema],
                               :table => temp_table,
                               :mysql_table => options[:table]})
  table = options[:table]
  export_id = options[:export_id]
  new_options = prepare_options options
  new_options[:file] = options[:filepath]
  new_options[:table] = temp_table
  new_options[:schema] = options[:vertica_schema]
  
   
  vertica_copy new_options
  
  Kernel.p "+++++++++++++++++ new_options "
  puts new_options
  options[:table] = table
  puts options
  g_options = {:db => options[:vertica_db], :schema => options[:vertica_schema], :table => options[:table]}
  grants = Myreplicator::VerticaUtils.get_grants g_options
  # drop the old table
  sql = "DROP TABLE IF EXISTS #{options[:vertica_db]}.#{options[:vertica_schema]}.#{options[:table]} CASCADE;"
  #VerticaDb::Base.connection.execute sql
  Myreplicator::DB.exec_sql("vertica",sql)
  # rename
  sql = "ALTER TABLE #{options[:vertica_db]}.#{options[:vertica_schema]}.#{temp_table} RENAME TO \"#{options[:table]}\";"
  
  #VerticaDb::Base.connection.execute sql
  Myreplicator::DB.exec_sql("vertica",sql)
  Myreplicator::VerticaUtils.set_grants(grants)
end

.clean_up_temp_tables(db) ⇒ Object



421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/loader/vertica/vertica_loader.rb', line 421

def  clean_up_temp_tables db
  sql = "SELECT table_name FROm v_catalog.tables WHERE table_schema ='#{db}' and table_name LIKE 'temp_%';"
  result = Myreplicator::DB.exec_sql("vertica",sql)
  result.rows.each do |row|
    tb = row[:table_name]
  
    if tb.size > 15
      time_str = tb[(tb.size-15)..(tb.size-1)]
      begin
        time = Time.local(time_str[0..3], time_str[4..5], time_str[6..7], time_str[9..10], time_str[11..12], time_str[13..14])
      rescue Exception => e
        puts e.message
        next
      end
      if time < Time.now() - 1.day
        sql = "DROP TABLE IF EXISTS #{db}.#{tb} CASCADE;"
        Myreplicator::DB.exec_sql("vertica",sql)
      end
    end
  end
end

.create_table(*args) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/loader/vertica/vertica_loader.rb', line 5

def create_table *args
  options = args.extract_options!
  Kernel.p "===== OPTION ====="
        puts options
  columns = []
  options[:mysql_schema].each(:as => :hash) do |row|
    columns << row
  end
  options[:columns] = columns

  sql = Myreplicator::VerticaSql.create_table_stmt options
  puts sql
  #VerticaDb::Base.connection.execute sql
  Myreplicator::DB.exec_sql("vertica",sql)
end

.create_temp_table(*args) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/loader/vertica/vertica_loader.rb', line 73

def create_temp_table *args
  options = args.extract_options!
  temp_table_name = "temp_" + options[:table] + DateTime.now.strftime('%Y%m%d_%H%M%S').to_s

  VerticaLoader.create_table({:mysql_schema => options[:mysql_schema],
                               :vertica_db => options[:vertica_db], 
                               :vertica_schema => options[:vertica_schema],
                               :table => temp_table_name,
                               :mysql_table => options[:table]})
  return temp_table_name
end

.destination_table_vertica(options) ⇒ Object



21
22
23
24
25
26
27
# File 'lib/loader/vertica/vertica_loader.rb', line 21

def destination_table_vertica options
  sql = "select column_name, data_type From columns where 
           table_name = '#{options[:table]}' AND table_schema = '#{options[:destination_schema]}'"
  puts sql
  result = DB.exec_sql("vertica",sql)
  return result     
end

.get_analyze_constraints(*args) ⇒ Object



443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'lib/loader/vertica/vertica_loader.rb', line 443

def get_analyze_constraints *args
  options = args.extract_options!
  exp = Export.find(options[:export_id])
  Kernel.p "!!!!! get_analyze_constraints !!!!!"
  begin
    if exp.analyze_constraints == true
      sql = "SELECT analyze_constraints('#{options[:vertica_db]}.#{options[:vertica_schema]}.#{options[:table]}');"
      result = Myreplicator::DB.exec_sql("vertica",sql)
      if result.entries.size > 0
        return 1
      end
      sql = "SELECT COUNT(*) FROM #{options[:vertica_schema]}.#{options[:table]} WHERE modified_date < '#{(DateTime.now() -1.hour).strftime('%Y-%m-%d %H:%M:%S')}';"
      source_count = Myreplicator::DB.exec_sql("#{options[:source_schema]}",sql)
      target_count = Myreplicator::DB.exec_sql("vertica",sql)
      if source_count != target_count
        return 1
      end
    end
  rescue Exception => e
    puts e.message
  end  
  return 0      
end

.get_mysql_inserted_columns(mysql_schema_simple_form) ⇒ Object



314
315
316
317
318
319
320
# File 'lib/loader/vertica/vertica_loader.rb', line 314

def get_mysql_inserted_columns mysql_schema_simple_form
  result = []
  mysql_schema_simple_form.each do |col|
    result << col["column_name"]
  end
  return result
end

.get_mysql_keys(mysql_schema_simple_form) ⇒ Object



294
295
296
297
298
299
300
301
302
# File 'lib/loader/vertica/vertica_loader.rb', line 294

def get_mysql_keys mysql_schema_simple_form
  result = []
  mysql_schema_simple_form.each do |col|
    if col["column_key"] == "PRI" 
      result << col["column_name"]
    end
  end
  return result
end

.get_mysql_none_keys(mysql_schema_simple_form) ⇒ Object



304
305
306
307
308
309
310
311
312
# File 'lib/loader/vertica/vertica_loader.rb', line 304

def get_mysql_none_keys mysql_schema_simple_form
  result = []
  mysql_schema_simple_form.each do |col|
    if col["column_key"] != "PRI"
      result << col["column_name"]
    end
  end
  return result
end

.get_vsql_copy_command(prepared_options) ⇒ Object



223
224
225
226
227
228
229
230
231
232
233
# File 'lib/loader/vertica/vertica_loader.rb', line 223

def get_vsql_copy_command prepared_options
  Kernel.p "===== get_vsql_copy_command prepared_options ====="
  Kernel.p prepared_options
  file_extension = prepared_options[:file].split('.').last
  file_handler = ""
  file_handler = "GZIP" if file_extension == "gz" 
  tmp_dir = Myreplicator.tmp_path
  sql = "COPY #{prepared_options[:schema]}.#{prepared_options[:table]} FROM LOCAL \'#{prepared_options[:file]}\' #{file_handler} DELIMITER E\'#{prepared_options[:delimiter]}\' NULL as \'#{prepared_options[:null_value]}\' ENCLOSED BY E\'#{prepared_options[:enclosed]}\' RECORD TERMINATOR \'#{prepared_options[:line_terminate]}\' EXCEPTIONS '#{tmp_dir}/load_logs/#{prepared_options[:schema]}_#{prepared_options[:table_name]}.log' REJECTED DATA '#{tmp_dir}/rejected_data/#{prepared_options[:schema]}_#{prepared_options[:table_name]}.txt';"
  cmd = "#{prepared_options[:vsql]} -h #{prepared_options[:host]} -U #{prepared_options[:user]} -w #{prepared_options[:pass]} -d #{prepared_options[:db]} -c \"#{sql}\""
  return cmd
end

.get_vsql_merge_command(options, keys, none_keys, inserted_columns) ⇒ Object



322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
# File 'lib/loader/vertica/vertica_loader.rb', line 322

def get_vsql_merge_command options, keys, none_keys, inserted_columns
  Kernel.p "===== Merge Options ====="
  Kernel.p options
  a = prepare_options options
  Kernel.p a
  prepared_options = options
  sql = "MERGE INTO "
  sql+= "#{prepared_options[:db]}.#{prepared_options[:schema]}.#{prepared_options[:table]} target "
  sql+= "USING #{prepared_options[:db]}.#{prepared_options[:schema]}.#{prepared_options[:temp_table]} source "
  sql+= "ON "
  count = 0
  keys.each do |k|
    if count < 1 
      sql += "source.#{k} = target.#{k} "
    else
      sql += "AND source.#{k} = target.#{k} "
    end
    count += 1
  end
  sql+= "WHEN MATCHED THEN "
  sql+= "UPDATE SET "
  count = 1
  none_keys.each do |nk|
    if count < none_keys.size
      sql+= "#{nk} = source.#{nk}, "
    else
      sql+= "#{nk} = source.#{nk} "
    end
    count += 1
  end
  sql+= "WHEN NOT MATCHED THEN "
  sql+= "INSERT "
  count = 1
  sql+= " VALUES ("
  inserted_columns.each do |col|
    if count < inserted_columns.size
      sql+= "source.#{col}, "
    else
      sql+= "source.#{col}) "
    end
    count += 1
  end  
  sql+= "; COMMIT;"  
  cmd = "#{prepared_options[:vsql]} -h #{prepared_options[:host]} -U #{prepared_options[:user]} -w #{prepared_options[:pass]} -d #{prepared_options[:db]} -c \"#{sql}\""
  return cmd    
end

.load(*args) ⇒ Object

Loader::VerticaLoader.load(=> “king”, :table => “category_overview_data”, :file => “tmp/vertica/category_overview_data.tsv”, :null_value => “NULL”) check for export_type!



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# File 'lib/loader/vertica/vertica_loader.rb', line 117

def load *args
  options = args.extract_options!
   = options[:metadata]
  Kernel.p "===== metadata ====="
  Kernel.p 
  Kernel.p "===== metadata.export_type ====="
  Kernel.p .export_type
  Kernel.p options
  #options = {:table_name => "actucast_appeal", :destination_schema => "public", :source_schema => "raw_sources"}
  schema_check = Myreplicator::MysqlExporter.schema_changed?(:table => options[:table_name], 
                                               :destination_schema => options[:destination_schema], 
                                               :source_schema => options[:source_schema])
  Kernel.p "===== schema_check ====="
  Kernel.p schema_check
  #create a temp table
  temp_table = "temp_" + options[:table_name] + DateTime.now.strftime('%Y%m%d_%H%M%S').to_s
  ops = {:mysql_schema => schema_check[:mysql_schema],
    :vertica_db => options[:db],
    :vertica_schema => options[:destination_schema],
    :source_schema => options[:source_schema],
    :table => options[:table_name],
    :export_id => options[:export_id],
    :filepath => options[:filepath]
  }
  exp = Myreplicator::Export.find(.export_id)
  if schema_check[:new]
    create_table(ops)
    #LOAD DATA IN
    vertica_copy options
  elsif schema_check[:changed]
    if .export_type == 'initial'
      Kernel.p "===== schema_check[:changed] ====="
      Loader.clear_older_files   # clear old incremental files
      apply_schema_change(ops, temp_table)
    else
      Loader.cleanup  #Remove incremental file
      Kernel.p "===== Remove incremental file ====="
    end
  elsif exp.nightly_refresh && (exp.nightly_refresh_frequency != 0) && ((Time.now() - exp.nightly_refresh_last_run) >= exp.nightly_refresh_frequency.minute) && (Time.now().hour >= 21)
    Loader.clear_older_files   # clear old incremental files
    exp.nightly_refresh_last_run = Time.now().change(:min => 0)
    exp.save!
    sql = "TRUNCATE TABLE #{options[:db]}.#{options[:destination_schema]}.#{options[:table_name]};"
    Myreplicator::DB.exec_sql("vertica",sql)
    # run the export. The next time loader runs, it will load the file
    exp.export
  elsif get_analyze_constraints(ops) > 0 # check for primary key/unique keys violations
    Kernel.p "===== DROP CURRENT TABLE ====="
    sql = "TRUNCATE TABLE #{options[:db]}.#{options[:destination_schema]}.#{options[:table_name]};"
    Myreplicator::DB.exec_sql("vertica",sql)
    # run the export. The next time loader runs, it will load the file
    exp.export
  else # incremental load
    temp_table = Myreplicator::VerticaLoader.create_temp_table ops
    options[:table] = temp_table
    Kernel.p "===== COPY TO TEMP TABLE #{temp_table} ====="
    Myreplicator::VerticaLoader.vertica_copy options
    options[:temp_table] = "#{temp_table}"
    options[:table] = options[:table_name]
    sql = "SELECT COUNT(*) FROM #{options[:db]}.#{options[:destination_schema]}.#{options[:temp_table]};"
    result = Myreplicator::DB.exec_sql("vertica",sql)
    #temporary fix for racing refresh cause by one worker doing loader for many export jobs. Better fix: each export job starts its own loader worker
    if result.entries.first[:COUNT] == 0
      Kernel.p "===== DROP TEMP TABLE ====="
      sql = "DROP TABLE IF EXISTS #{options[:db]}.#{options[:destination_schema]}.#{temp_table} CASCADE;"
      Myreplicator::DB.exec_sql("vertica",sql)
    else
      if exp.export_type == 'all'
        g_options = {:db => options[:db], :schema => options[:destination_schema], :table => options[:table]}
        grants = Myreplicator::VerticaUtils.get_grants g_options
        Kernel.p "===== DROP CURRENT TABLE ====="
        sql = "DROP TABLE IF EXISTS #{options[:db]}.#{options[:destination_schema]}.#{options[:table]} CASCADE;"
        Myreplicator::DB.exec_sql("vertica",sql)
        sql = "ALTER TABLE #{options[:db]}.#{options[:destination_schema]}.#{options[:temp_table]} RENAME TO \"#{options[:table]}\";"
        Kernel.p sql
        Myreplicator::DB.exec_sql("vertica",sql)
        Myreplicator::VerticaUtils.set_grants(grants)
      elsif exp.export_type == 'incremental'
        Kernel.p "===== MERGE ====="
        Myreplicator::VerticaLoader.vertica_merge options
        #drop the temp table
        Kernel.p "===== DROP TEMP TABLE ====="
        sql = "DROP TABLE IF EXISTS #{options[:db]}.#{options[:destination_schema]}.#{temp_table} CASCADE;"
        Myreplicator::DB.exec_sql("vertica",sql)
      end
    end
  end
end

.prepare_options(*args) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/loader/vertica/vertica_loader.rb', line 85

def prepare_options *args
  #options = args.extract_options!.clone
  options = args.extract_options!
  Kernel.p "===== OPTION  [options[:db]] ====="
  puts options
  # How not to hard code the vertica connection config ?
  vertica_options = ActiveRecord::Base.configurations["vertica"]
  enclosed_by = Myreplicator.configs[options[:source_schema]]['enclosed_by']
  options.reverse_merge!(:host => vertica_options["host"],
                        :user => vertica_options["username"],
                        :pass => vertica_options["password"],
                        :db   => vertica_options["database"],
                        :schema => options[:destination_schema],
                        :table => options[:table_name],
                        :file => options[:filepath],
                        :delimiter => "\\0",
                        :null_value => "NULL",
                        :line_terminate => ";~~;\n",
                        :enclosed => "#{Myreplicator.configs[options[:source_schema]]['enclosed_by']}")
                        #:enclosed => '\"')
# working now but should fix this 
  if !vertica_options["vsql"].blank?
    options.reverse_merge!(:vsql => vertica_options["vsql"])
  else
    options.reverse_merge!(:vsql => "/opt/vertica/bin/vsql")
  end
  
  return options  
end

.process_file(*args) ⇒ Object



235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/loader/vertica/vertica_loader.rb', line 235

def process_file *args
  ### replace the null values in the input file 
  options = args.extract_options!
  options[:file].blank? ? return : file = options[:file]
  options[:list_of_nulls].blank? ? list_of_nulls = [] : list_of_nulls = options[:list_of_nulls]
  options[:null_value].blank? ? null_value = "NULL" : null_value = options[:null_value]
  Kernel.p "===== file #{file}====="
  file_extension = file.split('.').last
  Kernel.p "===== file_extension #{file_extension}====="
  
  case file_extension
  when "tsv", "csv"
    process_flat_file(file, list_of_nulls, null_value)
  when "gz"
    process_gzip_file(file, list_of_nulls, null_value)
  else
    raise "Un supported file extension"
  end
end

.process_flat_file(file, list_of_nulls, null_value) ⇒ Object



271
272
273
274
# File 'lib/loader/vertica/vertica_loader.rb', line 271

def process_flat_file file, list_of_nulls, null_value 
  # sed
  replace_null(file, list_of_nulls, null_value)
end

.process_gzip_file(file, list_of_nulls, null_value) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/loader/vertica/vertica_loader.rb', line 276

def process_gzip_file file, list_of_nulls, null_value
  # unzip
  tmp_dir = Myreplicator.tmp_path
  temp_file = "#{tmp_dir}/temp_#{file.split('.').first.split('/').last}.txt"
  cmd = "gunzip -f #{file} -c > #{temp_file}"
  Kernel.p cmd
  system(cmd)
  # sed
  replace_null("#{temp_file}", list_of_nulls, null_value)
  # zip
  cmd2 = "gzip #{temp_file} -c > #{file}"
  Kernel.p cmd2
  system(cmd2)
  cmd3 = "rm #{temp_file}"
  Kernel.p cmd3
  system(cmd3)
end

.replace_null(file, list_of_nulls, null_value = "NULL") ⇒ Object



255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/loader/vertica/vertica_loader.rb', line 255

def replace_null(file, list_of_nulls, null_value = "NULL")
  list_of_nulls.each do | value|
    # special case for NULL MySQL datetime/date type but the column is defined NOT NULL
    extension = file.split('.').last
    if value == '0000-00-00'
      cmd1 = "sed -i 's/#{value}/1900-01-01/g' #{file}"
      Kernel.p cmd1
      system(cmd1)
    else
      cmd1 = "sed -i 's/#{value}/#{null_value}/g' #{file}"
      Kernel.p cmd1
      system(cmd1)
    end
  end
end

.vertica_copy(*args) ⇒ Object



206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/loader/vertica/vertica_loader.rb', line 206

def vertica_copy * args
  options = args.extract_options!
  list_of_nulls =  ["0000-00-00"]
  prepared_options = prepare_options options
  if prepared_options[:file].blank?
    raise "No input file"
  end
  
  process_file(:file => prepared_options[:file], 
               :list_of_nulls => list_of_nulls,
               :null_value => prepared_options[:null_value])

  cmd = get_vsql_copy_command(prepared_options)
  puts cmd
  system(cmd)
end

.vertica_merge(*args) ⇒ Object



369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
# File 'lib/loader/vertica/vertica_loader.rb', line 369

def vertica_merge *args
  options = args.extract_options!
   = options[:metadata]
  Kernel.p "===== MERGE metadata ====="
  Kernel.p 
  ops = {:table => options[:table_name], 
  :destination_schema => options[:destination_schema], 
  :source_schema => options[:source_schema]}
  mysql_schema = Loader.mysql_table_definition(options)
  vertica_schema = VerticaLoader.destination_table_vertica(options)
  mysql_schema_simple_form = MysqlExporter.get_mysql_schema_rows mysql_schema
  # get the column(s) that is(are) used as the primary key
  keys = get_mysql_keys mysql_schema_simple_form
  # get the non key coluns 
  none_keys = get_mysql_none_keys mysql_schema_simple_form
  # get the column to put in the insert part
  inserted_columns = get_mysql_inserted_columns mysql_schema_simple_form
  #get the vsql merge command 
  cmd = get_vsql_merge_command options, keys, none_keys, inserted_columns 
  #execute    
  puts cmd
  begin
    result = `#{cmd} 2>&1`
    if result[0..4] == "ERROR"
      if result[0..9] == "ERROR 4115"
        g_options = {:db => options[:db], :schema => options[:destination_schema], :table => options[:table]}
        grants = Myreplicator::VerticaUtils.get_grants g_options
        sql = "DROP TABLE IF EXISTS #{options[:db]}.#{options[:destination_schema]}.#{options[:table]} CASCADE;"
        Kernel.p "===== DROP CMD ====="
        Kernel.p sql
        Myreplicator::DB.exec_sql("vertica",sql)
        sql = "ALTER TABLE #{options[:db]}.#{options[:destination_schema]}.#{options[:temp_table]} RENAME TO \"#{options[:table]}\";"
        Kernel.p sql
        Myreplicator::DB.exec_sql("vertica",sql)
        Myreplicator::VerticaUtils.set_grants(grants)
      else
        Loader.cleanup 
        sql = "DROP TABLE IF EXISTS #{options[:db]}.#{options[:destination_schema]}.#{options[:temp_table]} CASCADE;"
        Kernel.p "===== DROP CMD ====="
        Kernel.p sql
        #VerticaDb::Base.connection.execute sql
        Myreplicator::DB.exec_sql("vertica",sql)
        raise result
      end
    end
  rescue Exception => e
    raise e.message 
  ensure
    # place holder
  end
end