Module: BulkImporter

Defined in:
lib/bulk_importer.rb,
lib/bulk_importer/version.rb

Constant Summary collapse

UPDATE_MODE_APPEND =

Update modes

'append'
UPDATE_MODE_UPDATE =
'update'
UPDATE_MODE_REPLACE =
'replace'
NAME =

Module name (used for loggin).

'BulkImporter'
VERSION =
"0.2.4"

Class Method Summary collapse

Class Method Details

.import_from_csv(target, file, columns, keys, delimiter: ',', null: '', header: true, update_mode: UPDATE_MODE_APPEND) ⇒ Object

Import data from a CSV file to an existing table

Parameters

  • target Target table.

  • file Source CSV file.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

Options

  • delimiter

  • null

  • header

  • update_mode Update mode for imported data.

Updated modes

  • self::UPDATE_MODE_APPEND Move only new data (default).

  • self::UPDATE_MODE_UPDATE Insert new data and updated prexistent.

  • self::UPDATE_MODE_REPLACE Truncate old data and insert the new one.

Return

integer Number of imported rows.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/bulk_importer.rb', line 40

def self.import_from_csv(target, file, columns, keys, delimiter: ',', null: '', header: true, update_mode: UPDATE_MODE_APPEND)
  return -1 unless file.is_a? File

  conn = ActiveRecord::Base.connection
  temp_name = target + '_' + Time.now.to_i.to_s + '_temporal'

  begin
    # Create temporary table (with all CSV fields)
    Rails.logger.debug \
      "[#{NAME}] Creating temporary table #{temp_name}(#{columns.keys})"
    conn.execute self.make_create_temp_table_sql(temp_name, columns.keys)

    # Import data
    Rails.logger.debug \
      "[#{NAME}] Importing data from #{file} to #{temp_name}"
    PostgresqlModule.copy_from(
      file,
      temp_name,
      format:    'csv',
      delimiter: delimiter,
      null:      null,
      header:    header
    )

    # Move data from temporary table to target and return total imported rows
    Rails.logger.debug \
      "[#{NAME}] Moving new data to #{target} with mode #{update_mode}"
    self.move_imported_data(temp_name, target, columns, keys, update_mode)
  rescue Exception => e
    Rails.logger.error e.message
    Rails.logger.error e.backtrace
    return -1
  ensure
    # Drop temporary table (if exists)
    ActiveRecord::Base.connection.execute "DROP TABLE IF EXISTS #{temp_name}"
  end
end

.is_update_mode_valid(update_mode) ⇒ Object

Check if the update mode is valid.

Parameters

  • update_mode

Return

bool



318
319
320
321
322
323
324
325
326
327
328
# File 'lib/bulk_importer.rb', line 318

def self.is_update_mode_valid(update_mode)
  valid_update_modes = []

  self.constants.each do |constant|
    if constant.to_s.start_with? 'UPDATE_MODE_'
      valid_update_modes << self.const_get(constant)
    end
  end

  valid_update_modes.include? update_mode.to_s
end

.keys_to_list(keys, prefix = nil, types = nil) ⇒ Object

Translate an array of keys in a list with an optional prefix.

Parameters

  • keys

  • prefix

  • types

Return

  • string

TODO: Add doc



294
295
296
297
298
299
300
301
302
303
304
305
306
# File 'lib/bulk_importer.rb', line 294

def self.keys_to_list(keys, prefix = nil, types = nil)
  list = []

  keys.each do |i|
    col = i
    col = [ prefix, i ].compact.join('.') unless prefix.nil?
    col = [ col, types[i] ].compact.join('::') unless types.nil?

    list << col
  end

  list.join ','
end

.make_create_temp_table_sql(name, columns) ⇒ Object

Makes the SQL command to create a temporary table.

Parameters

  • name Name of temporary table.

  • columns Array of columns (just name).

Return

string



275
276
277
278
# File 'lib/bulk_importer.rb', line 275

def self.make_create_temp_table_sql(name, columns)
  columns = columns.map { |i| i + ' text' }
  "CREATE TEMPORARY TABLE #{name} (#{columns.join(',')})"
end

.make_move_imported_data_sql(origin, destination, columns, keys, update_mode) ⇒ Object

Makes the SQL command to move the imported data.

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

  • update_mode Update mode.

Return

array Array of queries to execute.



133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/bulk_importer.rb', line 133

def self.make_move_imported_data_sql(origin, destination, columns, keys, update_mode)
  case update_mode
  when UPDATE_MODE_APPEND
    # Insert new
    self.make_update_mode_append_sql origin, destination, columns, keys
  when UPDATE_MODE_UPDATE
    # Insert new and Update prexistent
    self.make_update_mode_update_sql origin, destination, columns, keys
  when UPDATE_MODE_REPLACE
    # Truncate destination and Insert new (all)
    self.make_update_mode_replace_sql origin, destination, columns, keys
  end
end

.make_update_mode_append_sql(origin, destination, columns, keys) ⇒ Object

Makes the SQL command to append new imported data.

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

Return

array Array of queries to execute



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/bulk_importer.rb', line 160

def self.make_update_mode_append_sql(origin, destination, columns, keys)
  sql = []

  columns = columns.delete_if { |item| columns[item].nil? }
  types = {}

  pg_types = PostgresqlModule.get_column_types destination
  columns.values.each { |i| types[columns.invert[i]] = pg_types[i] }

  sql << "INSERT INTO #{destination}"
  sql << "(#{columns.values.join(',')})"
  sql << "SELECT #{self.keys_to_list(columns.keys, 'o', types)}"
  sql << "FROM #{origin} o"
  sql << "LEFT JOIN #{destination} d"
  sql << "ON (#{self.keys_to_list(keys.keys, 'o', types)}) = "
  sql << "(#{self.keys_to_list(keys.values, 'd')})"
  sql << "WHERE (#{self.keys_to_list(keys.values, 'd')}) is null"

  [ sql.join(' ') ]
end

.make_update_mode_replace_sql(origin, destination, columns, key) ⇒ Object

Makes the SQL command to remove existing data and Insert new (all).

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

Return

array Array of queries to execute



255
256
257
258
259
260
261
262
# File 'lib/bulk_importer.rb', line 255

def self.make_update_mode_replace_sql(origin, destination, columns, key)
  q = []

  q << "TRUNCATE TABLE #{destination}"
  q.concat self.make_update_mode_append_sql origin, destination, columns, keys

  return q
end

.make_update_mode_update_sql(origin, destination, columns, keys) ⇒ Object

Makes the SQL command to update prexistent data.

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

Return

array Array of queries to execute



194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/bulk_importer.rb', line 194

def self.make_update_mode_update_sql(origin, destination, columns, keys)
  q = self.make_update_mode_append_sql(origin, destination, columns, keys)

  sql = []
  types = {}

  pg_types = PostgresqlModule.get_column_types destination
  columns.values.each { |i| types[columns.invert[i]] = pg_types[i] }

  o_columns_without_keys = columns.keys.delete_if { |i| keys.has_key? i }
  d_columns_without_keys = columns.values.delete_if { |i| keys.has_value? i }

  set = []
  columns.delete_if { |item| columns[item].nil? }.keys.each do |column|
    set << "#{columns[column]} = o.#{column}::#{types[column]}"
  end
  sets = set.join(',')

  if columns.has_value? 'updated_at'
    # Field is updated if origin's updated_at is greater than destination.

    PostgresqlModule.create_index_on origin, columns.invert['updated_at'].downcase

    sql << "UPDATE #{destination} d SET #{sets}"
    sql << "FROM #{origin} o"
    sql << "WHERE (#{self.keys_to_list(keys.keys, 'o', types)}) = "
    sql << "(#{self.keys_to_list(keys.values, 'd')}) AND"
    sql << "o.#{columns.invert['updated_at'].downcase}::timestamp > d.updated_at"
  else
    # Check if any field changed
    sql << "WITH #{origin}_prexistent_modified AS ("
    sql << "SELECT o.* FROM #{origin} o JOIN #{destination} d"
    sql << "ON (#{self.keys_to_list(keys.keys, 'o', types)}) = "
    sql << "(#{self.keys_to_list(keys.values, 'd')}) AND "
    sql << "(#{self.keys_to_list(o_columns_without_keys, 'o', types)}) != "
    sql << "(#{self.keys_to_list(d_columns_without_keys, 'd')})"
    sql << ")"
    sql << "UPDATE #{destination} d SET #{sets}"
    sql << "FROM #{origin}_prexistent_modified o"
    sql << "WHERE (#{self.keys_to_list(keys.keys, 'o', types)}) = "
    sql << "(#{self.keys_to_list(keys.values, 'd')})"
  end

  q << sql.join(' ')

  return q
end

.move_imported_data(origin, destination, columns, keys, update_mode) ⇒ Object

Move imported data from origin (raw imported) to destination.

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

  • update_mode Update mode.

Return

integer Number of imported rows.



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/bulk_importer.rb', line 92

def self.move_imported_data(origin, destination, columns, keys, update_mode)
  unless self.is_update_mode_valid update_mode
    raise "[#{NAME}] Unknown update mode: #{update_mode}"
  end

  # Create an index to improve move performance.
  Rails.logger.debug "[#{NAME}] Creating index on #{origin}"
  PostgresqlModule.create_index_on origin, keys.keys.map { |i| i.downcase }

  queries = self.make_move_imported_data_sql(
    origin,
    destination,
    columns,
    keys,
    update_mode
  )

  rows = 0

  queries.each do |query|
    Rails.logger.debug "[#{NAME}] Running query <<#{query}>>"
    rows += ActiveRecord::Base.connection.execute(query).cmd_tuples
  end

  return rows
end