Class: CanvasSync::Importers::BulkImporter

Inherits:
Object
  • Object
show all
Defined in:
lib/canvas_sync/importers/bulk_importer.rb

Defined Under Namespace

Classes: NullRowBuffer, RowBuffer, UserRowBuffer

Constant Summary collapse

DEFAULT_BATCH_SIZE =

The batch import size can be customized by setting the ‘BULK_IMPORTER_BATCH_SIZE’ environment variable

10_000

Class Method Summary collapse

Class Method Details

.batch_sizeObject



155
156
157
158
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 155

def self.batch_size
  batch_size = ENV["BULK_IMPORTER_BATCH_SIZE"].to_i
  batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE
end

.condition_sql(klass, columns, report_start = nil) ⇒ Object

This method generates SQL that looks like: (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email)

This prevents activerecord-import from setting the ‘updated_at` column for rows that haven’t actually changed. This allows you to query for rows that have changed by doing something like:

started_at = Time.now run_the_users_sync! changed = User.where(“updated_at >= ?”, started_at)



141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 141

def self.condition_sql(klass, columns, report_start = nil)
  columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ")
  excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ")
  condition_sql = "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})"

  if klass.column_names.include?("canvas_synced_at") && report_start
    condition_sql += " AND #{klass.quoted_table_name}.canvas_synced_at < '#{report_start}'"
  elsif klass.column_names.include?("updated_at") && report_start
    condition_sql += " AND #{klass.quoted_table_name}.updated_at < '#{report_start}'"
  end

  condition_sql
end

.import(report_file_path, mapping, klass, conflict_target, import_args: {}) {|row| ... } ⇒ Object

Does a bulk import of a set of models using the activerecord-import gem.

Parameters:

  • report_file_path (String)

    path to the report CSV

  • mapping (Hash)

    a hash of the values to import. See ‘model_mappings.yml` for a format example

  • klass (Object)

    e.g., User

  • conflict_target (Symbol)

    the csv column name that maps to the database column that will determine if we need to update or insert a given row. e.g.,: canvas_user_id

  • import_args (Hash) (defaults to: {})

    Any arguments passed here will be passed through to ActiveRecord::BulkImport. Note: passing the key [:on_duplicate_key_ignore] will override the default behavior of [:on_duplicate_key_update]

Yield Parameters:

  • row (Array)

    if a block is passed in it will yield the current row from the CSV. This can be used if you need to filter or massage the data in any way.



20
21
22
23
24
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 20

def self.import(report_file_path, mapping, klass, conflict_target, import_args: {}, &blk) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/LineLength
  ClassCallbackExecutor.run_if_defined(klass, :sync_import) do
    perform_in_batches(report_file_path, mapping, klass, conflict_target, import_args: import_args, &blk)
  end
end

.perform_import(klass, columns, rows, conflict_target, import_args = {}) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 95

def self.perform_import(klass, columns, rows, conflict_target, import_args={})
  return if rows.length.zero?
  columns = columns.dup

  update_conditions = {
    condition: condition_sql(klass, columns, import_args[:sync_start_time]),
    columns: columns
  }
  update_conditions[:conflict_target] = conflict_target if conflict_target.present?

  options = { validate: false, on_duplicate_key_update: update_conditions }.merge(import_args)
  options.delete(:on_duplicate_key_update) if options.key?(:on_duplicate_key_ignore)

  result = nil
  callback_env = {
    batch: rows,
    import_result: nil,
    import_options: options,
  }
  ClassCallbackExecutor.run_if_defined(klass, :sync_batch_import, callback_env) do
    result = klass.import(columns, rows, options)
    callback_env[:import_result] = result

    global_updates = {
      canvas_synced_at: DateTime.now,
      canvas_sync_batch_id: JobBatches::Batch.current_context[:sync_batch_id],
    }
    global_updates.slice!(*klass.column_names.map(&:to_sym))
    if global_updates.present? && result.ids.present?
      klass.where(id: result.ids).update_all(global_updates)
    end
  end

  result
end

.perform_in_batches(report_file_path, raw_mapping, klass, conflict_target, import_args: {}) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/canvas_sync/importers/bulk_importer.rb', line 26

def self.perform_in_batches(report_file_path, raw_mapping, klass, conflict_target, import_args: {})
  mapping = {}.with_indifferent_access
  raw_mapping.each do |db_col, opts|
    next if opts[:deprecated] && !klass.column_names.include?(db_col.to_s)

    mapping[db_col] = opts
  end

  csv_column_names = mapping.values.map { |value| value[:report_column].to_s }
  database_column_names = mapping.keys

  conflict_target = Array(conflict_target).map(&:to_s)
  conflict_target_indices = conflict_target.map{|ct| database_column_names.index(ct) }

  row_ids = {}
  batcher = CanvasSync::BatchProcessor.new(of: batch_size) do |batch|
    row_ids = {}
    perform_import(klass, database_column_names, batch, conflict_target, import_args)
  end

  row_buffer_out = ->(row) {
    formatted_row = mapping.map do |db_col, col_def|
      value = nil
      value = row[col_def[:report_column]] if col_def[:report_column]

      if col_def[:type]
        if col_def[:type].to_sym == :datetime
          # TODO: add some timezone config to the mapping.
          # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one
          value = DateTime.parse(value).utc rescue nil # rubocop:disable Style/RescueModifier
        elsif col_def[:type].to_sym == :yaml
          # CSV parse from reading the report escapes the new lines and YAML throws an error with double backslashes
          value = YAML.load(value.gsub("\\n", "\n")) rescue nil # rubocop:disable Style/RescueModifier
        end
      end

      value = col_def[:transform].call(value, row) if col_def[:transform]

      value
    end

    if conflict_target.present?
      key = conflict_target_indices.map{|ct| formatted_row[ct] }
      next if row_ids[key]

      row_ids[key] = true
    end

    batcher << formatted_row
  }

  row_buffer = nil
  if defined?(User) && klass == User && csv_column_names.include?('user_id')
    row_buffer = UserRowBuffer.new(&row_buffer_out)
  else
    row_buffer = NullRowBuffer.new(&row_buffer_out)
  end

  CSV.foreach(report_file_path, headers: true, header_converters: :symbol) do |row|
    row = yield(row) if block_given?
    next if row.nil?

    row_buffer << row
  end

  row_buffer.flush
  batcher.flush
end