Class: Sequent::Core::Persistors::ActiveRecordPersistor

Inherits:
Object
  • Object
show all
Includes:
Persistor
Defined in:
lib/sequent/core/persistors/active_record_persistor.rb

Overview

The ActiveRecordPersistor uses ActiveRecord to update the projection

This is the default persistor in Sequent, but when your event store is growing it is not the best choice as persistor while rebuilding a projection when migrating to a new version of that projection.

Please see the ReplayOptimizedPostgresPersistor as alternative for migrating projections to a new version.

Instance Method Summary collapse

Instance Method Details

#commitObject



110
111
112
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 110

def commit
  # noop
end

#create_or_update_record(record_class, values, created_at = Time.now) {|record| ... } ⇒ Object

Yields:

  • (record)


54
55
56
57
58
59
60
61
62
63
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 54

def create_or_update_record(record_class, values, created_at = Time.now)
  record = get_record(record_class, values)
  unless record
    record = new_record(record_class, values)
    record.created_at = created_at if record.respond_to?(:created_at)
  end
  yield record
  record.save!
  record
end

#create_record(record_class, values) {|record| ... } ⇒ Object

Yields:

  • (record)


32
33
34
35
36
37
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 32

def create_record(record_class, values)
  record = new_record(record_class, values)
  yield record if block_given?
  record.save!
  record
end

#create_records(record_class, array_of_value_hashes) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 39

def create_records(record_class, array_of_value_hashes)
  table = record_class.arel_table

  query = array_of_value_hashes.map do |values|
    insert_manager = new_insert_manager
    insert_manager.into(table)
    insert_manager.insert(values.map do |key, value|
      convert_to_values(key, table, value)
    end)
    insert_manager.to_sql
  end.join(";")

  execute_sql(query)
end

#delete_all_records(record_class, where_clause) ⇒ Object



73
74
75
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 73

def delete_all_records(record_class, where_clause)
  record_class.unscoped.where(where_clause).delete_all
end

#delete_record(_, record) ⇒ Object



77
78
79
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 77

def delete_record(_, record)
  record.destroy
end

#do_with_record(record_class, where_clause) {|record| ... } ⇒ Object

Yields:

  • (record)


92
93
94
95
96
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 92

def do_with_record(record_class, where_clause)
  record = record_class.unscoped.where(where_clause).first!
  yield record
  record.save!
end

#do_with_records(record_class, where_clause) ⇒ Object



85
86
87
88
89
90
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 85

def do_with_records(record_class, where_clause)
  record_class.unscoped.where(where_clause).each do |record|
    yield record
    record.save!
  end
end

#execute_sql(statement) ⇒ Object



106
107
108
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 106

def execute_sql(statement)
  ActiveRecord::Base.connection.execute(statement)
end

#find_records(record_class, where_clause) ⇒ Object



98
99
100
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 98

def find_records(record_class, where_clause)
  record_class.unscoped.where(where_clause)
end

#get_record(record_class, where_clause) ⇒ Object



69
70
71
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 69

def get_record(record_class, where_clause)
  record_class.unscoped.where(where_clause).first
end

#get_record!(record_class, where_clause) ⇒ Object



65
66
67
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 65

def get_record!(record_class, where_clause)
  record_class.unscoped.where(where_clause).first!
end

#last_record(record_class, where_clause) ⇒ Object



102
103
104
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 102

def last_record(record_class, where_clause)
  record_class.unscoped.where(where_clause).last
end

#update_all_records(record_class, where_clause, updates) ⇒ Object



81
82
83
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 81

def update_all_records(record_class, where_clause, updates)
  record_class.unscoped.where(where_clause).update_all(updates)
end

#update_record(record_class, event, where_clause = {aggregate_id: event.aggregate_id}, options = {}) {|record| ... } ⇒ Object

Yields:

  • (record)


20
21
22
23
24
25
26
27
28
29
30
# File 'lib/sequent/core/persistors/active_record_persistor.rb', line 20

def update_record(record_class, event, where_clause = {aggregate_id: event.aggregate_id}, options = {}, &block)
  record = record_class.unscoped.where(where_clause).first
  raise("Record of class #{record_class} with where clause #{where_clause} not found while handling event #{event}") unless record
  record.updated_at = event.created_at if record.respond_to?(:updated_at)
  yield record if block_given?
  update_sequence_number = options.key?(:update_sequence_number) ?
                             options[:update_sequence_number] :
                             record.respond_to?(:sequence_number=)
  record.sequence_number = event.sequence_number if update_sequence_number
  record.save!
end