Class: ActiveDataFrame::Database

Inherits:
Object
  • Object
show all
Defined in:
lib/active_data_frame/database.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(block_type, data_frame_type) ⇒ Database

Returns a new instance of Database.



46
47
48
49
# File 'lib/active_data_frame/database.rb', line 46

def initialize(block_type, data_frame_type)
  @block_type = block_type
  @data_frame_type = data_frame_type
end

Instance Attribute Details

#block_typeObject (readonly)

Returns the value of attribute block_type.



44
45
46
# File 'lib/active_data_frame/database.rb', line 44

def block_type
  @block_type
end

#data_frame_typeObject (readonly)

Returns the value of attribute data_frame_type.



44
45
46
# File 'lib/active_data_frame/database.rb', line 44

def data_frame_type
  @data_frame_type
end

Class Method Details

.batchObject



51
52
53
54
55
56
57
58
59
60
# File 'lib/active_data_frame/database.rb', line 51

def self.batch
  self.batching, prev_batch = true, self.batching
  Thread.current[:batch] ||= ''
  ActiveRecord::Base.transaction do
    yield
  end
ensure
  self.batching = prev_batch
  flush! unless self.batching
end

.batchingObject



4
5
6
# File 'lib/active_data_frame/database.rb', line 4

def self.batching
  !!Thread.current[:active_data_frame_batching]
end

.batching=(value) ⇒ Object



8
9
10
# File 'lib/active_data_frame/database.rb', line 8

def self.batching=(value)
  Thread.current[:active_data_frame_batching] = !!value
end

.execute(sql) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/active_data_frame/database.rb', line 12

def self.execute(sql)
  if ActiveDataFrame::Database.batching
    Thread.current[:batch] << sql << ?;
  else
    unless sql.empty?
      ActiveRecord::Base.transaction do
        ActiveDataFrame::DataFrameProxy.suppress_logs do
          case ActiveRecord::Base.connection_db_config.adapter
          when 'sqlite3'.freeze
            ActiveRecord::Base.connection.raw_connection.execute_batch sql
          when 'mysql2'
            sql.split(';').reject{|x| x.strip.empty?}.each do |stmt|
              ActiveRecord::Base.connection.execute(stmt)
            end
          else
            ActiveRecord::Base.connection.execute(sql)
          end
        end
      end
    end
  end
end

.flush!Object



35
36
37
38
# File 'lib/active_data_frame/database.rb', line 35

def self.flush!
  execute(Thread.current[:batch])
  Thread.current[:batch] = ''
end

.for_types(block:, df:) ⇒ Object



40
41
42
# File 'lib/active_data_frame/database.rb', line 40

def self.for_types(block:, df:)
  (@@configs ||= {})[[block, df]] ||= Database.new(block, df)
end

Instance Method Details

#bulk_delete(id, indices) ⇒ Object



178
179
180
181
182
183
# File 'lib/active_data_frame/database.rb', line 178

def bulk_delete(id, indices)
  indices.each_slice(ActiveDataFrame.delete_max_batch_size) do |slice|
    # puts "Deleting slice of #{slice.length}"
    block_type.where(data_frame_id: id, period_index: slice).delete_all
  end
end

#bulk_insert(new_blocks, columns = block_type::COLUMNS) ⇒ Object

Insert block data for all blocks in a single call PostgreSQL uses COPY, others use multi-statement insert



189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/active_data_frame/database.rb', line 189

def bulk_insert(new_blocks, columns=block_type::COLUMNS)
  new_blocks.each_slice(ActiveDataFrame.insert_max_batch_size) do |new_blocks_slice|
    if ActiveRecord::Base.connection_db_config.adapter == 'postgresql'
      copy_statement = "COPY #{block_type.table_name} (#{columns.join(',')},data_frame_id,period_index,data_frame_type) FROM STDIN CSV"
      db_conn = ActiveRecord::Base.connection.raw_connection
      db_conn.copy_data(copy_statement) do
        new_blocks_slice.each do |period_index, (values, df_id)|
          db_conn.put_copy_data((values + [df_id, period_index, data_frame_type.name]).join(',') << "\n")
        end
      end
    else
      inserts = ''
      new_blocks_slice.each do |period_index, (values, df_id)|
        inserts << \
        case ActiveRecord::Base.connection_db_config.adapter
        when 'mysql2' then "(#{values.map{|v| v.inspect.gsub('"',"'") }.join(',')}, #{df_id}, #{period_index}, '#{data_frame_type.name}'),"
        else "(#{values.map{|v| v.inspect.gsub('"',"'") }.join(',')}, #{df_id}, #{period_index}, '#{data_frame_type.name}'),"
        end
      end
      sql = "INSERT INTO #{block_type.table_name} (#{columns.join(',')}, data_frame_id, period_index, data_frame_type) VALUES #{inserts[0..-2]}"
      Database.execute sql
    end
  end
end

#bulk_update(existing, columns = block_type::COLUMNS) ⇒ Object

Fast update block data for all blocks in a single call. Uses UPDATE + SET in PostgreSQL Uses INSERT ON CONFLICT for MySQL (Upsert) Uses UPDATE with CASE on others



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/active_data_frame/database.rb', line 108

def bulk_update(existing, columns=block_type::COLUMNS)
  existing.each_slice(ActiveDataFrame.update_max_batch_size) do |existing_slice|
    # puts "Updating slice of #{existing_slice.length}"
    case ActiveRecord::Base.connection_db_config.adapter
    when 'postgresql'.freeze
      #
      # PostgreSQL Supports the fast setting of multiple update values that differ
      # per row from a temporary table.
      #
      updates = ''
      existing_slice.each do |period_index, (values, df_id)|
        updates <<  "(#{df_id}, #{period_index}, #{values.map{|v| v.inspect.gsub('"',"'") }.join(',')}),"
      end
      Database.execute(
        <<-SQL
        UPDATE #{block_type.table_name}
          SET #{columns.map{|col| "#{col} = t.#{col}" }.join(", ")}
          FROM(
          VALUES #{updates[0..-2]}) as t(data_frame_id, period_index, #{columns.join(',')})
          WHERE #{block_type.table_name}.data_frame_id = t.data_frame_id
          AND #{block_type.table_name}.period_index = t.period_index
          AND #{block_type.table_name}.data_frame_type = '#{data_frame_type.name}'
        SQL
      )
    #
    # For MySQL we use the ON DUPLICATE KEY UPDATE functionality.
    # This relies on there being a unique index dataframe and period index
    # on the blocks table.
    # This tends to be faster than the general CASE based solution below
    # but slower than the PostgreSQL solution above
    #
    when 'mysql2'.freeze
      # Fast bulk update
      updates, on_duplicate = "", ""
      existing_slice.each do |period_index, (values, df_id)|
        updates << "(#{values.map{|v| v.inspect.gsub('"',"'") }.join(',')}, #{df_id}, #{period_index}, '#{data_frame_type.name}'),"
      end
      on_duplicate = columns.map do |cname|
        "#{cname}=VALUES(#{cname})"
      end.join(", ")
      stmt = <<-SQL
        INSERT INTO #{block_type.table_name} (#{columns.join(',')},data_frame_id,period_index,data_frame_type)
        VALUES #{updates[0..-2]}
        ON DUPLICATE KEY UPDATE #{on_duplicate}
      SQL
      Database.execute(stmt)
    else
      #
      # General CASE based solution for multiple differing updates
      # set per row.
      # We use a CASE statement per column which determines the column
      # to set based on the period index
      #
      ids = existing_slice.map {|_, (_, id)| id}
      updates = columns.map.with_index do |column, column_idx|
        [column, "CASE \n#{existing_slice.map{|period_index, (values, df_id)| "WHEN period_index=#{period_index} AND data_frame_id=#{df_id} then #{values[column_idx]}" }.join("\n")} \nEND\n"]
      end.to_h
      update_statement = updates.map{|cl, up| "#{cl} = #{up}" }.join(', ')
      Database.execute(<<-SQL
        UPDATE #{block_type.table_name} SET #{update_statement} WHERE
        #{block_type.table_name}.data_frame_id IN (#{ids.join(',')})
        AND #{block_type.table_name}.data_frame_type = '#{data_frame_type.name}'
        AND #{block_type.table_name}.period_index IN (#{existing_slice.map(&:first).join(', ')});
      SQL
      )
    end
  end
end

#bulk_upsert(upserts, scope = nil) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/active_data_frame/database.rb', line 62

def bulk_upsert(upserts, scope=nil)
  Database.batch do
    case ActiveRecord::Base.connection_db_config.adapter
    when 'postgresql'.freeze
      upserts.group_by(&:keys).each do |columns, value_list|
        columns = columns - [:data_frame_id, :period_index]
        inserts = ''
        value_list.each do |row|
          df_id, period_index, *values = row.values
          inserts <<  "(#{values.map{|v| v.inspect.gsub('"',"'") }.join(',')}, #{df_id}, #{period_index}, '#{data_frame_type.name}'),"
        end
        sql = %Q{
          INSERT INTO #{block_type.table_name} (#{columns.join(',')}, data_frame_id, period_index, data_frame_type)
          VALUES #{inserts[0..-2]}
          ON CONFLICT(data_frame_id, period_index, data_frame_type) DO UPDATE
          SET #{columns.map{|c| "#{c} = excluded.#{c} "}.join(',')}
        }
        Database.execute sql
      end
    when 'mysql2'.freeze
      upserts.group_by(&:keys).each do |columns, rows|
        update = rows.map(&:values).map{|df_id, period_index, *values| [period_index, [values, df_id]] }
        bulk_update(update, columns - [:data_frame_id, :period_index])
      end
    else
      all_update_indices     = scope[].pluck(:data_frame_id, :period_index)
      grouped_update_indices = all_update_indices.group_by(&:first).transform_values{|value| Set.new(value.map!(&:last)) }
      updates, inserts = upserts.partition{|upsert| grouped_update_indices[upsert[:data_frame_id]]&.include?(upsert[:period_index]) }
      updates.group_by(&:keys).each do |columns, rows|
        update = rows.map(&:values).map{|df_id, period_index, *values| [period_index, [values, df_id]] }
        bulk_update(update, columns - [:data_frame_id, :period_index])
      end
      inserts.group_by(&:keys).each do |columns, rows|
        insert = rows.map(&:values).map{|df_id, period_index, *values| [period_index, [values, df_id]] }
        bulk_insert(insert, columns - [:data_frame_id, :period_index])
      end
    end
  end
end