Class: Synapse::EventStore::Mongo::StorageStrategy Abstract

Inherits:
Object
  • Object
show all
Defined in:
lib/synapse/event_store/mongo/storage_strategy.rb

Overview

This class is abstract.

Represents a mechanism used to structure how events are stored in the database

Constant Summary collapse

ASCENDING =

Aliases of the Mongo constants for ascending and descending

::Mongo::ASCENDING
DESCENDING =
::Mongo::DESCENDING

Instance Method Summary collapse

Constructor Details

#initialize(template, serializer, upcaster_chain) ⇒ undefined

Parameters:

  • template (MongoTemplate)
  • serializer (Serializer)
  • upcaster_chain (UpcasterChain)


11
12
13
14
15
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 11

def initialize(template, serializer, upcaster_chain)
  @template = template
  @serializer = Serialization::MessageSerializer.new serializer
  @upcaster_chain = upcaster_chain
end

Instance Method Details

#create_documents(type_identifier, events) ⇒ Array

This method is abstract.

Creates documents that will represent the events being committed to the event store

Parameters:

  • type_identifier (String)

    Type identifier for the aggregate

  • events (Array)

    Domain events to be committed

Returns:

  • (Array)


23
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 23

def create_documents(type_identifier, events); end

#ensure_indexesundefined

Ensures that the correct indexes are in place

Returns:

  • (undefined)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 88

def ensure_indexes
  options = {
    name: 'unique_aggregate_index',
    unique: true
  }

  spec = {
    aggregate_id: ASCENDING,
    aggregate_type: ASCENDING,
    sequence_number: ASCENDING
  }

  @template.event_collection.ensure_index spec, options

  spec = {
    aggregate_id: ASCENDING,
    aggregate_type: ASCENDING,
    sequence_number: DESCENDING
  }

  @template.snapshot_collection.ensure_index spec, options
end

#extract_events(document, aggregate_id) ⇒ Array

This method is abstract.

Extracts individual event messages from the given document

The given aggregate identifier is passed so that event messages can have the actual identifier object instead of the serialized aggregate identifier.

Parameters:

  • document (Hash)
  • aggregate_id (Object)

Returns:

  • (Array)


34
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 34

def extract_events(document, aggregate_id); end

#fetch_events(type_identifier, aggregate_id, first_sequence_number) ⇒ Mongo::Cursor

Provides a cursor for accessing all events for an aggregate with the given identifier and type identifier, with a sequence number equal to or greater than the given first sequence number

The returned documents should be ordered chronologically, typically by using the sequence number.

Parameters:

  • type_identifier (String)
  • aggregate_id (Object)
  • first_sequence_number (Integer)

Returns:

  • (Mongo::Cursor)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 51

def fetch_events(type_identifier, aggregate_id, first_sequence_number)
  filter = {
    aggregate_id: aggregate_id,
    aggregate_type: type_identifier,
    sequence_number: {
      '$gte' => first_sequence_number
    }
  }

  sort = {
    sequence_number: ASCENDING
  }

  @template.event_collection.find(filter).sort(sort)
end

#fetch_last_snapshot(type_identifier, aggregate_id) ⇒ Mongo::Cursor

Finds the document containing the most recent snapshot event for an aggregate with the given identifier and type identifier

Parameters:

  • type_identifier (String)
  • aggregate_id (Object)

Returns:

  • (Mongo::Cursor)


73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/synapse/event_store/mongo/storage_strategy.rb', line 73

def fetch_last_snapshot(type_identifier, aggregate_id)
  filter = {
    aggregate_id: aggregate_id,
    aggregate_type: type_identifier
  }

  sort = {
    sequence_number: DESCENDING
  }

  @template.snapshot_collection.find(filter).sort(sort).limit(1)
end