Class: Sequent::Core::Persistors::ReplayOptimizedPostgresPersistor

Inherits:
Object
  • Object
show all
Includes:
Persistor
Defined in:
lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb

Overview

The ReplayOptimizedPostgresPersistor is optimized for bulk loading records in a Postgres database.

Depending on the amount of records it uses CSV import, otherwise statements are batched using normal sql.

Rebuilding the view state (or projection) of an aggregate typically consists of an initial insert and then many updates and maybe a delete. With a normal Persistor (like ActiveRecordPersistor) each action is executed to the database. This persistor creates an in-memory store first and finally flushes the in-memory store to the database. This can significantly reduce the amount of queries to the database. E.g. 1 insert, 6 updates is only a single insert using this Persistor.

After lot of experimenting this turned out to be the fastest way to to bulk inserts in the database. You can tweak the amount of records in the CSV via insert_with_csv_size before it flushes to the database to gain (or loose) speed.

It is highly recommended to create indices on the in memory record_store to speed up the processing. By default all records are indexed by aggregate_id if they have such a property.

Example:

class InvoiceProjector < Sequent::Core::Projector
  on RecipientMovedEvent do |event|
    update_all_records(
      InvoiceRecord,
      { aggregate_id: event.aggregate_id, recipient_id: event.recipient.aggregate_id },
      { recipient_street: event.recipient.street },
    end
  end
end

In this case it is wise to create an index on InvoiceRecord on the aggregate_id and recipient_id attributes like you would in the database. Note that previous versions of this class supported multi-column indexes. These are now split into multiple single-column indexes and the results of each index is combined using set-intersection. This reduces the amount of memory used and makes it possible to use an index in more cases (whenever an indexed attribute is present in the where clause the index will be used, so not all attributes need to be present).

Example:

ReplayOptimizedPostgresPersistor.new(
  50,
  {InvoiceRecord => [:aggregate_id, :recipient_id]}
)

Defined Under Namespace

Modules: InMemoryStruct Classes: Index

Constant Summary collapse

CHUNK_SIZE =
1024

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(insert_with_csv_size = 50, indices = {}, default_indexed_columns = [:aggregate_id]) ⇒ ReplayOptimizedPostgresPersistor

insert_with_csv_size number of records to insert in a single batch

indices Hash of indices to create in memory. Greatly speeds up the replaying.

Key corresponds to the name of the 'Record'
Values contains list of lists on which columns to index.
E.g. [[:first_index_column], [:another_index, :with_to_columns]]


171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 171

def initialize(insert_with_csv_size = 50, indices = {}, default_indexed_columns = [:aggregate_id])
  @insert_with_csv_size = insert_with_csv_size
  @record_store = Hash.new { |h, k| h[k] = Set.new.compare_by_identity }
  @record_index = Hash.new do |h, k|
    h[k] = Index.new(default_indexed_columns.to_set & k.column_names.map(&:to_sym))
  end

  indices.each do |record_class, indexed_columns|
    columns = indexed_columns.flatten(1).to_set(&:to_sym) + default_indexed_columns
    @record_index[record_class] = Index.new(columns & record_class.column_names.map(&:to_sym))
  end

  @record_defaults = Hash.new do |h, record_class|
    h[record_class] = record_class.column_defaults.symbolize_keys
  end
end

Instance Attribute Details

#insert_with_csv_sizeObject

Returns the value of attribute insert_with_csv_size.



60
61
62
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 60

def insert_with_csv_size
  @insert_with_csv_size
end

#record_storeObject (readonly)

Returns the value of attribute record_store.



59
60
61
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 59

def record_store
  @record_store
end

Instance Method Details

#clearObject



358
359
360
361
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 358

def clear
  @record_store.clear
  @record_index.each_value(&:clear)
end

#commitObject



312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 312

def commit
  @record_store.each do |clazz, records|
    @column_cache ||= {}
    @column_cache[clazz.name] ||= clazz.columns.reduce({}) do |hash, column|
      hash.merge({column.name => column})
    end
    if records.size > @insert_with_csv_size
      csv = CSV.new(StringIO.new)
      column_names = clazz.column_names.reject { |name| name == 'id' }
      records.each do |record|
        csv << column_names.map do |column_name|
          cast_value_to_column_type(clazz, column_name, record)
        end
      end

      conn = Sequent::ApplicationRecord.connection.raw_connection
      copy_data = StringIO.new(csv.string)
      conn.transaction do
        conn.copy_data("COPY #{clazz.table_name} (#{column_names.join(',')}) FROM STDIN WITH csv") do
          while (out = copy_data.read(CHUNK_SIZE))
            conn.put_copy_data(out)
          end
        end
      end
    else
      clazz.unscoped do
        inserts = []
        column_names = clazz.column_names.reject { |name| name == 'id' }
        prepared_values = (1..column_names.size).map { |i| "$#{i}" }.join(',')
        records.each do |record|
          values = column_names.map do |column_name|
            cast_value_to_column_type(clazz, column_name, record)
          end
          inserts << values
        end
        sql = %{insert into #{clazz.table_name} (#{column_names.join(',')}) values (#{prepared_values})}
        inserts.each do |insert|
          clazz.connection.raw_connection.async_exec(sql, insert)
        end
      end
    end
  end
ensure
  clear
end

#create_or_update_record(record_class, values, created_at = Time.now) {|record| ... } ⇒ Object

Yields:

  • (record)


220
221
222
223
224
225
226
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 220

def create_or_update_record(record_class, values, created_at = Time.now)
  record = get_record(record_class, values)
  record ||= create_record(record_class, values.merge(created_at: created_at))
  yield record if block_given?
  @record_index[record_class].update(record)
  record
end

#create_record(record_class, values) {|record| ... } ⇒ Object

Yields:

  • (record)


201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 201

def create_record(record_class, values)
  record = struct_cache[record_class].new(**values)
  @record_defaults[record_class].each do |column, default|
    record[column] = default unless values.include? column
  end
  record.updated_at = values[:created_at] if record.respond_to?(:updated_at)

  yield record if block_given?

  @record_store[record_class] << record
  @record_index[record_class].add(record)

  record
end

#create_records(record_class, array_of_value_hashes) ⇒ Object



216
217
218
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 216

def create_records(record_class, array_of_value_hashes)
  array_of_value_hashes.each { |values| create_record(record_class, values) }
end

#delete_all_records(record_class, where_clause) ⇒ Object



242
243
244
245
246
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 242

def delete_all_records(record_class, where_clause)
  find_records(record_class, where_clause).each do |record|
    delete_record(record_class, record)
  end
end

#delete_record(record_class, record) ⇒ Object



248
249
250
251
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 248

def delete_record(record_class, record)
  @record_store[record_class].delete(record)
  @record_index[record_class].remove(record)
end

#do_with_record(record_class, where_clause) {|record| ... } ⇒ Object

Yields:

  • (record)


270
271
272
273
274
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 270

def do_with_record(record_class, where_clause)
  record = get_record!(record_class, where_clause)
  yield record
  @record_index[record_class].update(record)
end

#do_with_records(record_class, where_clause) ⇒ Object



262
263
264
265
266
267
268
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 262

def do_with_records(record_class, where_clause)
  records = find_records(record_class, where_clause)
  records.each do |record|
    yield record
    @record_index[record_class].update(record)
  end
end

#find_records(record_class, where_clause) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 276

def find_records(record_class, where_clause)
  where_clause = where_clause.symbolize_keys

  indexed_columns = @record_index[record_class].indexed_columns
  indexed_fields, non_indexed_fields = where_clause.partition { |field, _| indexed_columns.include? field }

  candidate_records = if indexed_fields.present?
                        @record_index[record_class].find(indexed_fields)
                      else
                        @record_store[record_class]
                      end

  return candidate_records.to_a if non_indexed_fields.empty?

  candidate_records.select do |record|
    non_indexed_fields.all? do |k, v|
      expected_value = Persistors.normalize_symbols(v)
      actual_value = Persistors.normalize_symbols(record[k])
      if expected_value.is_a?(Array)
        expected_value.include?(actual_value)
      else
        actual_value == expected_value
      end
    end
  end
end

#get_record(record_class, where_clause) ⇒ Object



237
238
239
240
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 237

def get_record(record_class, where_clause)
  results = find_records(record_class, where_clause)
  results.empty? ? nil : results.first
end

#get_record!(record_class, where_clause) ⇒ Object



228
229
230
231
232
233
234
235
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 228

def get_record!(record_class, where_clause)
  record = get_record(record_class, where_clause)
  unless record
    fail("record #{record_class} not found for #{where_clause}, store: #{@record_store[record_class]}")
  end

  record
end

#last_record(record_class, where_clause) ⇒ Object



303
304
305
306
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 303

def last_record(record_class, where_clause)
  results = find_records(record_class, where_clause)
  results.empty? ? nil : results.last
end

#prepareObject



308
309
310
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 308

def prepare
  # noop
end

#struct_cacheObject



82
83
84
85
86
87
88
89
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 82

def struct_cache
  @struct_cache ||= Hash.new do |hash, record_class|
    struct_class = Struct.new(*record_class.column_names.map(&:to_sym), keyword_init: true) do
      include InMemoryStruct
    end
    hash[record_class] = struct_class
  end
end

#update_all_records(record_class, where_clause, updates) ⇒ Object



253
254
255
256
257
258
259
260
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 253

def update_all_records(record_class, where_clause, updates)
  find_records(record_class, where_clause).each do |record|
    updates.each_pair do |k, v|
      record[k] = v
    end
    @record_index[record_class].update(record)
  end
end

#update_record(record_class, event, where_clause = {aggregate_id: event.aggregate_id}, options = {}) {|record| ... } ⇒ Object

Yields:

  • (record)


188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/sequent/core/persistors/replay_optimized_postgres_persistor.rb', line 188

def update_record(record_class, event, where_clause = {aggregate_id: event.aggregate_id}, options = {})
  record = get_record!(record_class, where_clause)
  record.updated_at = event.created_at if record.respond_to?(:updated_at=)
  yield record if block_given?
  @record_index[record_class].update(record)
  update_sequence_number = if options.key?(:update_sequence_number)
                             options[:update_sequence_number]
                           else
                             record.respond_to?(:sequence_number=)
                           end
  record.sequence_number = event.sequence_number if update_sequence_number
end