Class: Synapse::EventStore::Mongo::StorageStrategy Abstract
- Inherits:
-
Object
- Object
- Synapse::EventStore::Mongo::StorageStrategy
- Defined in:
- lib/synapse/event_store/mongo/storage_strategy.rb
Overview
Represents a mechanism used to structure how events are stored in the database
Direct Known Subclasses
Constant Summary collapse
- ASCENDING =
Aliases of the Mongo constants for ascending and descending
::Mongo::ASCENDING
- DESCENDING =
::Mongo::DESCENDING
Instance Method Summary collapse
-
#create_documents(type_identifier, events) ⇒ Array
abstract
Creates documents that will represent the events being committed to the event store.
-
#ensure_indexes ⇒ undefined
Ensures that the correct indexes are in place.
-
#extract_events(document, aggregate_id) ⇒ Array
abstract
Extracts individual event messages from the given document.
-
#fetch_events(type_identifier, aggregate_id, first_sequence_number) ⇒ Mongo::Cursor
Provides a cursor for accessing all events for an aggregate with the given identifier and type identifier, with a sequence number equal to or greater than the given first sequence number.
-
#fetch_last_snapshot(type_identifier, aggregate_id) ⇒ Mongo::Cursor
Finds the document containing the most recent snapshot event for an aggregate with the given identifier and type identifier.
- #initialize(template, serializer, upcaster_chain) ⇒ undefined constructor
Constructor Details
#initialize(template, serializer, upcaster_chain) ⇒ undefined
11 12 13 14 15 |
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 11 def initialize(template, serializer, upcaster_chain) @template = template @serializer = Serialization::MessageSerializer.new serializer @upcaster_chain = upcaster_chain end |
Instance Method Details
#create_documents(type_identifier, events) ⇒ Array
Creates documents that will represent the events being committed to the event store
23 |
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 23 def create_documents(type_identifier, events); end |
#ensure_indexes ⇒ undefined
Ensures that the correct indexes are in place
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 88 def ensure_indexes = { name: 'unique_aggregate_index', unique: true } spec = { aggregate_id: ASCENDING, aggregate_type: ASCENDING, sequence_number: ASCENDING } @template.event_collection.ensure_index spec, spec = { aggregate_id: ASCENDING, aggregate_type: ASCENDING, sequence_number: DESCENDING } @template.snapshot_collection.ensure_index spec, end |
#extract_events(document, aggregate_id) ⇒ Array
Extracts individual event messages from the given document
The given aggregate identifier is passed so that event messages can have the actual identifier object instead of the serialized aggregate identifier.
34 |
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 34 def extract_events(document, aggregate_id); end |
#fetch_events(type_identifier, aggregate_id, first_sequence_number) ⇒ Mongo::Cursor
Provides a cursor for accessing all events for an aggregate with the given identifier and type identifier, with a sequence number equal to or greater than the given first sequence number
The returned documents should be ordered chronologically, typically by using the sequence number.
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 51 def fetch_events(type_identifier, aggregate_id, first_sequence_number) filter = { aggregate_id: aggregate_id, aggregate_type: type_identifier, sequence_number: { '$gte' => first_sequence_number } } sort = { sequence_number: ASCENDING } @template.event_collection.find(filter).sort(sort) end |
#fetch_last_snapshot(type_identifier, aggregate_id) ⇒ Mongo::Cursor
Finds the document containing the most recent snapshot event for an aggregate with the given identifier and type identifier
73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 73 def fetch_last_snapshot(type_identifier, aggregate_id) filter = { aggregate_id: aggregate_id, aggregate_type: type_identifier } sort = { sequence_number: DESCENDING } @template.snapshot_collection.find(filter).sort(sort).limit(1) end |