Class: Synapse::EventStore::InMemoryEventStore

Inherits:
Synapse::EventStore show all
Defined in:
lib/synapse/event_store/in_memory.rb

Overview

Implementation of an event store that stores events in memory; for testing purposes and not thread safe

Instance Method Summary collapse

Constructor Details

#initializeInMemoryEventStore

Returns a new instance of InMemoryEventStore.



6
7
8
# File 'lib/synapse/event_store/in_memory.rb', line 6

def initialize
  @streams = Hash.new
end

Instance Method Details

#append_events(type_identifier, stream) ⇒ undefined

Appends any events in the given stream to the end of the aggregate’s stream

Parameters:

  • type_identifier (String)

    Type descriptor of the aggregate to append to

  • stream (DomainEventStream)

Returns:

  • (undefined)


34
35
36
37
38
39
40
41
42
43
44
# File 'lib/synapse/event_store/in_memory.rb', line 34

def append_events(type_identifier, stream)
  if stream.end?
    return
  end

  events = events_for stream.peek.aggregate_id

  until stream.end?
    events.push stream.next_event
  end
end

#clearObject

Clears all streams from this event store



11
12
13
# File 'lib/synapse/event_store/in_memory.rb', line 11

def clear
  @streams.clear
end

#events_for(aggregate_id) ⇒ Array<DomainEventMessage>

Creates and/or retrieves an array of events for the given aggregate identifier

Parameters:

  • aggregate_id (Object)

Returns:

  • (Array<DomainEventMessage>)


50
51
52
53
54
55
56
# File 'lib/synapse/event_store/in_memory.rb', line 50

def events_for(aggregate_id)
  if @streams.has_key? aggregate_id
    return @streams.fetch aggregate_id
  end

  @streams.store aggregate_id, Array.new
end

#read_events(type_identifier, aggregate_id) ⇒ DomainEventStream

Parameters:

  • type_identifier (String)

    Type descriptor of the aggregate to retrieve

  • aggregate_id (Object)

Returns:

  • (DomainEventStream)

Raises:



19
20
21
22
23
24
25
26
27
# File 'lib/synapse/event_store/in_memory.rb', line 19

def read_events(type_identifier, aggregate_id)
  events = events_for aggregate_id

  if events.empty?
    raise StreamNotFoundError.new type_identifier, aggregate_id
  end

  Domain::SimpleDomainEventStream.new events
end