Class: Sequent::Core::Persistors::ActiveRecordPersistor

Inherits:
Object
  • Object
show all
Includes:
Persistor
Defined in:
lib/sequent/core/persistors/active_record_persistor.rb

Overview

The ActiveRecordPersistor uses ActiveRecord to update the projection

This is the default persistor in Sequent, but when your event store is growing it is not the best choice as persistor while rebuilding a projection when migrating to a new version of that projection.

Please see the ReplayOptimizedPostgresPersistor as alternative for migrating projections to a new version.

Instance Method Summary collapse

Instance Method Details

#commitObject



120
121
122
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 120

def commit
  # noop
end

#create_or_update_record(record_class, values, created_at = Time.now) {|record| ... } ⇒ Object

Yields:

  • (record)


64
65
66
67
68
69
70
71
72
73
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 64

def create_or_update_record(record_class, values, created_at = Time.now)
  record = get_record(record_class, values)
  unless record
    record = new_record(record_class, values)
    record.created_at = created_at if record.respond_to?(:created_at)
  end
  yield record
  record.save!
  record
end

#create_record(record_class, values) {|record| ... } ⇒ Object

Yields:

  • (record)


40
41
42
43
44
45
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 40

def create_record(record_class, values)
  record = new_record(record_class, values)
  yield record if block_given?
  record.save!
  record
end

#create_records(record_class, array_of_value_hashes) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 47

def create_records(record_class, array_of_value_hashes)
  table = record_class.arel_table

  query = array_of_value_hashes.map do |values|
    insert_manager = new_insert_manager
    insert_manager.into(table)
    insert_manager.insert(
      values.map do |key, value|
        convert_to_values(key, table, value)
      end,
    )
    insert_manager.to_sql
  end.join(';')

  execute_sql(query)
end

#delete_all_records(record_class, where_clause) ⇒ Object



83
84
85
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 83

def delete_all_records(record_class, where_clause)
  record_class.unscoped.where(where_clause).delete_all
end

#delete_record(_, record) ⇒ Object



87
88
89
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 87

def delete_record(_, record)
  record.destroy
end

#do_with_record(record_class, where_clause) {|record| ... } ⇒ Object

Yields:

  • (record)


102
103
104
105
106
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 102

def do_with_record(record_class, where_clause)
  record = record_class.unscoped.where(where_clause).first!
  yield record
  record.save!
end

#do_with_records(record_class, where_clause) ⇒ Object



95
96
97
98
99
100
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 95

def do_with_records(record_class, where_clause)
  record_class.unscoped.where(where_clause).each do |record|
    yield record
    record.save!
  end
end

#execute_sql(statement) ⇒ Object



116
117
118
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 116

def execute_sql(statement)
  Sequent::ApplicationRecord.connection.execute(statement)
end

#find_records(record_class, where_clause) ⇒ Object



108
109
110
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 108

def find_records(record_class, where_clause)
  record_class.unscoped.where(where_clause)
end

#get_record(record_class, where_clause) ⇒ Object



79
80
81
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 79

def get_record(record_class, where_clause)
  record_class.unscoped.where(where_clause).first
end

#get_record!(record_class, where_clause) ⇒ Object



75
76
77
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 75

def get_record!(record_class, where_clause)
  record_class.unscoped.where(where_clause).first!
end

#last_record(record_class, where_clause) ⇒ Object



112
113
114
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 112

def last_record(record_class, where_clause)
  record_class.unscoped.where(where_clause).last
end

#update_all_records(record_class, where_clause, updates) ⇒ Object



91
92
93
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 91

def update_all_records(record_class, where_clause, updates)
  record_class.unscoped.where(where_clause).update_all(updates)
end

#update_record(record_class, event, where_clause = {aggregate_id: event.aggregate_id}, options = {}) {|record| ... } ⇒ Object

Yields:

  • (record)


21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 21

def update_record(record_class, event, where_clause = {aggregate_id: event.aggregate_id}, options = {})
  record = record_class.unscoped.where(where_clause).first
  unless record
    fail(<<~EOS)
      Record of class #{record_class} with where clause #{where_clause} not found while handling event #{event}
    EOS
  end

  record.updated_at = event.created_at if record.respond_to?(:updated_at)
  yield record if block_given?
  update_sequence_number = if options.key?(:update_sequence_number)
                             options[:update_sequence_number]
                           else
                             record.respond_to?(:sequence_number=)
                           end
  record.sequence_number = event.sequence_number if update_sequence_number
  record.save!
end