Class: CanvasSync::Importers::BulkImporter

Inherits:
Object
  • Object
show all
Defined in:
lib/canvas_sync/importers/bulk_importer.rb

Constant Summary collapse

DEFAULT_BATCH_SIZE =

The batch import size can be customized by setting the ‘BULK_IMPORTER_BATCH_SIZE’ environment variable

10_000

Class Method Summary collapse

Class Method Details

.batch_sizeObject



97
98
99
100
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 97

def self.batch_size
  batch_size = ENV["BULK_IMPORTER_BATCH_SIZE"].to_i
  batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE
end

.condition_sql(klass, columns, report_start) ⇒ Object

This method generates SQL that looks like: (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email)

This prevents activerecord-import from setting the ‘updated_at` column for rows that haven’t actually changed. This allows you to query for rows that have changed by doing something like:

started_at = Time.now run_the_users_sync! changed = User.where(“updated_at >= ?”, started_at)



87
88
89
90
91
92
93
94
95
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 87

def self.condition_sql(klass, columns, report_start)
  columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ")
  excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ")
  condition_sql = "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})"
  if klass.column_names.include?("updated_at") && report_start
    condition_sql += " AND #{klass.quoted_table_name}.updated_at < '#{report_start}'"
  end
  condition_sql
end

.import(report_file_path, mapping, klass, conflict_target, import_args: {}) {|row| ... } ⇒ Object

Does a bulk import of a set of models using the activerecord-import gem.

Parameters:

  • report_file_path (String)

    path to the report CSV

  • mapping (Hash)

    a hash of the values to import. See ‘model_mappings.yml` for a format example

  • klass (Object)

    e.g., User

  • conflict_target (Symbol)

    the csv column name that maps to the database column that will determine if we need to update or insert a given row. e.g.,: canvas_user_id

  • import_args (Hash) (defaults to: {})

    Any arguments passed here will be passed through to ActiveRecord::BulkImport. Note: passing the key [:on_duplicate_key_ignore] will override the default behavior of [:on_duplicate_key_update]

Yield Parameters:

  • row (Array)

    if a block is passed in it will yield the current row from the CSV. This can be used if you need to filter or massage the data in any way.



20
21
22
23
24
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 20

def self.import(report_file_path, mapping, klass, conflict_target, import_args: {}, &blk) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/LineLength
  ClassCallbackExecutor.run_if_defined(klass, :sync_import) do
    perform_in_batches(report_file_path, mapping, klass, conflict_target, import_args: import_args, &blk)
  end
end

.perform_import(klass, columns, rows, conflict_target, import_args = {}) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 62

def self.perform_import(klass, columns, rows, conflict_target, import_args={})
  return if rows.length.zero?
  columns = columns.dup

  update_conditions = {
    condition: condition_sql(klass, columns, import_args[:sync_start_time]),
    columns: columns
  }
  update_conditions[:conflict_target] = conflict_target if conflict_target

  options = { validate: false, on_duplicate_key_update: update_conditions }.merge(import_args)
  options.delete(:on_duplicate_key_update) if options.key?(:on_duplicate_key_ignore)
  klass.import(columns, rows, options)
end

.perform_in_batches(report_file_path, mapping, klass, conflict_target, import_args: {}) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 26

def self.perform_in_batches(report_file_path, mapping, klass, conflict_target, import_args: {})
  csv_column_names = mapping.keys
  database_column_names = mapping.values.map { |value| value[:database_column_name] }
  rows = []
  row_ids = {}
  database_conflict_column_name = conflict_target ? mapping[conflict_target][:database_column_name] : nil

  CSV.foreach(report_file_path, headers: true, header_converters: :symbol) do |row|
    row = yield(row) if block_given?
    next if row.nil?

    if conflict_target
      next if row_ids[row[conflict_target]]
      row_ids[row[conflict_target]] = true
    end

    rows << csv_column_names.map do |column|
      if mapping[column][:type].to_sym == :datetime
        # TODO: add some timezone config to the mapping.
        # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one
        DateTime.parse(row[column]).utc rescue nil # rubocop:disable Style/RescueModifier
      else
        row[column]
      end
    end

    if rows.length >= batch_size
      perform_import(klass, database_column_names, rows, database_conflict_column_name, import_args)
      rows = []
      row_ids = {}
    end
  end

  perform_import(klass, database_column_names, rows, database_conflict_column_name, import_args)
end