Module: ActiveRecord::Import::PostgreSQLAdapter

Includes:
ImportSupport, OnDuplicateKeyUpdateSupport
Included in:
ConnectionAdapters::PostgreSQLAdapter
Defined in:
lib/activerecord-import/adapters/postgresql_adapter.rb

Constant Summary collapse

MIN_VERSION_FOR_UPSERT =
90_500

Instance Method Summary collapse

Methods included from ImportSupport

#supports_import?

Instance Method Details

#add_column_for_on_duplicate_key_update(column, options = {}) ⇒ Object

Add a column to be updated on duplicate key update


98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 98

def add_column_for_on_duplicate_key_update( column, options = {} ) # :nodoc:
  arg = options[:on_duplicate_key_update]
  if arg.is_a?( Hash )
    columns = arg.fetch( :columns ) { arg[:columns] = [] }
    case columns
    when Array then columns << column.to_sym unless columns.include?( column.to_sym )
    when Hash then columns[column.to_sym] = column.to_sym
    end
  elsif arg.is_a?( Array )
    arg << column.to_sym unless arg.include?( column.to_sym )
  end
end

#duplicate_key_update_error?(exception) ⇒ Boolean

Return true if the statement is a duplicate key record error

Returns:

  • (Boolean)

194
195
196
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 194

def duplicate_key_update_error?(exception) # :nodoc:
  exception.is_a?(ActiveRecord::StatementInvalid) && exception.to_s.include?('duplicate key')
end

#increment_locking_column!(results, locking_column) ⇒ Object


206
207
208
209
210
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 206

def increment_locking_column!(results, locking_column)
  if locking_column.present?
    results << "\"#{locking_column}\"=EXCLUDED.\"#{locking_column}\"+1"
  end
end

#insert_many(sql, values, options = {}, *args) ⇒ Object

:nodoc:


7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 7

def insert_many( sql, values, options = {}, *args ) # :nodoc:
  number_of_inserts = 1
  returned_values = []
  ids = []
  results = []

  base_sql, post_sql = if sql.is_a?( String )
    [sql, '']
  elsif sql.is_a?( Array )
    [sql.shift, sql.join( ' ' )]
  end

  sql2insert = base_sql + values.join( ',' ) + post_sql

  columns = returning_columns(options)
  if columns.blank? || (options[:no_returning] && !options[:recursive])
    insert( sql2insert, *args )
  else
    returned_values = if columns.size > 1
      # Select composite columns
      select_rows( sql2insert, *args )
    else
      select_values( sql2insert, *args )
    end
    query_cache.clear if query_cache_enabled
  end

  if options[:returning].blank?
    ids = returned_values
  elsif options[:primary_key].blank?
    results = returned_values
  else
    # split primary key and returning columns
    ids, results = split_ids_and_results(returned_values, columns, options)
  end

  ActiveRecord::Import::Result.new([], number_of_inserts, ids, results)
end

#next_value_for_sequence(sequence_name) ⇒ Object


64
65
66
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 64

def next_value_for_sequence(sequence_name)
  %{nextval('#{sequence_name}')}
end

#post_sql_statements(table_name, options) ⇒ Object

:nodoc:


68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 68

def post_sql_statements( table_name, options ) # :nodoc:
  sql = []

  if supports_on_duplicate_key_update?
    # Options :recursive and :on_duplicate_key_ignore are mutually exclusive
    if (options[:ignore] || options[:on_duplicate_key_ignore]) && !options[:on_duplicate_key_update] && !options[:recursive]
      sql << sql_for_on_duplicate_key_ignore( table_name, options[:on_duplicate_key_ignore] )
    end
  elsif logger && options[:on_duplicate_key_ignore] && !options[:on_duplicate_key_update]
    logger.warn "Ignoring on_duplicate_key_ignore because it is not supported by the database."
  end

  sql += super(table_name, options)

  columns = returning_columns(options)
  unless columns.blank? || (options[:no_returning] && !options[:recursive])
    sql << " RETURNING \"#{columns.join('", "')}\""
  end

  sql
end

#returning_columns(options) ⇒ Object


90
91
92
93
94
95
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 90

def returning_columns(options)
  columns = []
  columns += Array(options[:primary_key]) if options[:primary_key].present?
  columns |= Array(options[:returning]) if options[:returning].present?
  columns
end

#split_ids_and_results(values, columns, options) ⇒ Object


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 46

def split_ids_and_results(values, columns, options)
  ids = []
  results = []
  id_indexes = Array(options[:primary_key]).map { |key| columns.index(key) }
  returning_indexes = Array(options[:returning]).map { |key| columns.index(key) }

  values.each do |value|
    value_array = Array(value)
    ids << id_indexes.map { |i| value_array[i] }
    results << returning_indexes.map { |i| value_array[i] }
  end

  ids.map!(&:first) if id_indexes.size == 1
  results.map!(&:first) if returning_indexes.size == 1

  [ids, results]
end

#sql_for_conflict_target(args = {}) ⇒ Object


175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 175

def sql_for_conflict_target( args = {} )
  constraint_name = args[:constraint_name]
  conflict_target = args[:conflict_target]
  index_predicate = args[:index_predicate]
  if constraint_name.present?
    "ON CONSTRAINT #{constraint_name} "
  elsif conflict_target.present?
    '(' << Array( conflict_target ).reject( &:blank? ).join( ', ' ) << ') '.tap do |sql|
      sql << "WHERE #{index_predicate} " if index_predicate
    end
  end
end

#sql_for_default_conflict_target(table_name, primary_key) ⇒ Object


188
189
190
191
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 188

def sql_for_default_conflict_target( table_name, primary_key )
  conflict_target = Array(primary_key).join(', ')
  "(#{conflict_target}) " if conflict_target.present?
end

#sql_for_on_duplicate_key_ignore(table_name, *args) ⇒ Object

Returns a generated ON CONFLICT DO NOTHING statement given the passed in args.


113
114
115
116
117
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 113

def sql_for_on_duplicate_key_ignore( table_name, *args ) # :nodoc:
  arg = args.first
  conflict_target = sql_for_conflict_target( arg ) if arg.is_a?( Hash )
  " ON CONFLICT #{conflict_target}DO NOTHING"
end

#sql_for_on_duplicate_key_update(table_name, *args) ⇒ Object

Returns a generated ON CONFLICT DO UPDATE statement given the passed in args.


121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 121

def sql_for_on_duplicate_key_update( table_name, *args ) # :nodoc:
  arg, primary_key, locking_column = args
  arg = { columns: arg } if arg.is_a?( Array ) || arg.is_a?( String )
  return unless arg.is_a?( Hash )

  sql = ' ON CONFLICT '
  conflict_target = sql_for_conflict_target( arg )

  columns = arg.fetch( :columns, [] )
  condition = arg[:condition]
  if columns.respond_to?( :empty? ) && columns.empty?
    return sql << "#{conflict_target}DO NOTHING"
  end

  conflict_target ||= sql_for_default_conflict_target( table_name, primary_key )
  unless conflict_target
    raise ArgumentError, 'Expected :conflict_target or :constraint_name to be specified'
  end

  sql << "#{conflict_target}DO UPDATE SET "
  if columns.is_a?( Array )
    sql << sql_for_on_duplicate_key_update_as_array( table_name, locking_column, columns )
  elsif columns.is_a?( Hash )
    sql << sql_for_on_duplicate_key_update_as_hash( table_name, locking_column, columns )
  elsif columns.is_a?( String )
    sql << columns
  else
    raise ArgumentError, 'Expected :columns to be an Array or Hash'
  end

  sql << " WHERE #{condition}" if condition.present?

  sql
end

#sql_for_on_duplicate_key_update_as_array(table_name, locking_column, arr) ⇒ Object

:nodoc:


156
157
158
159
160
161
162
163
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 156

def sql_for_on_duplicate_key_update_as_array( table_name, locking_column, arr ) # :nodoc:
  results = arr.map do |column|
    qc = quote_column_name( column )
    "#{qc}=EXCLUDED.#{qc}"
  end
  increment_locking_column!(results, locking_column)
  results.join( ',' )
end

#sql_for_on_duplicate_key_update_as_hash(table_name, locking_column, hsh) ⇒ Object

:nodoc:


165
166
167
168
169
170
171
172
173
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 165

def sql_for_on_duplicate_key_update_as_hash( table_name, locking_column, hsh ) # :nodoc:
  results = hsh.map do |column1, column2|
    qc1 = quote_column_name( column1 )
    qc2 = quote_column_name( column2 )
    "#{qc1}=EXCLUDED.#{qc2}"
  end
  increment_locking_column!(results, locking_column)
  results.join( ',' )
end

#supports_on_duplicate_key_update?Boolean

Returns:

  • (Boolean)

198
199
200
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 198

def supports_on_duplicate_key_update?
  database_version >= MIN_VERSION_FOR_UPSERT
end

#supports_setting_primary_key_of_imported_objects?Boolean

Returns:

  • (Boolean)

202
203
204
# File 'lib/activerecord-import/adapters/postgresql_adapter.rb', line 202

def supports_setting_primary_key_of_imported_objects?
  true
end