Class: RubyEventStore::InMemoryRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/ruby_event_store/in_memory_repository.rb

Defined Under Namespace

Classes: EventInStream, UnsupportedVersionAnyUsage

Instance Method Summary collapse

Constructor Details

#initialize(serializer: NULL, ensure_supported_any_usage: false) ⇒ InMemoryRepository

Returns a new instance of InMemoryRepository.



26
27
28
29
30
31
32
# File 'lib/ruby_event_store/in_memory_repository.rb', line 26

def initialize(serializer: NULL, ensure_supported_any_usage: false)
  @serializer = serializer
  @streams = Hash.new { |h, k| h[k] = Array.new }
  @mutex = Mutex.new
  @storage = Hash.new
  @ensure_supported_any_usage = ensure_supported_any_usage
end

Instance Method Details

#append_to_stream(records, stream, expected_version) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/ruby_event_store/in_memory_repository.rb', line 34

def append_to_stream(records, stream, expected_version)
  serialized_records = records.map { |record| record.serialize(serializer) }

  with_synchronize(expected_version, stream) do |resolved_version|
    ensure_supported_any_usage(resolved_version, stream)
    unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)
      raise WrongExpectedEventVersion
    end

    serialized_records.each_with_index do |serialized_record, index|
      raise EventDuplicatedInStream if has_event?(serialized_record.event_id)
      storage[serialized_record.event_id] = serialized_record
      add_to_stream(stream, serialized_record, resolved_version, index)
    end
  end
  self
end

#count(spec) ⇒ Object



103
104
105
# File 'lib/ruby_event_store/in_memory_repository.rb', line 103

def count(spec)
  read_scope(spec).count
end

#delete_stream(stream) ⇒ Object



69
70
71
# File 'lib/ruby_event_store/in_memory_repository.rb', line 69

def delete_stream(stream)
  streams.delete(stream.name)
end

#event_in_stream?(event_id, stream) ⇒ Boolean

Returns:

  • (Boolean)


139
140
141
# File 'lib/ruby_event_store/in_memory_repository.rb', line 139

def event_in_stream?(event_id, stream)
  !streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }.nil?
end

#global_position(event_id) ⇒ Object



135
136
137
# File 'lib/ruby_event_store/in_memory_repository.rb', line 135

def global_position(event_id)
  storage.keys.index(event_id) or raise EventNotFound.new(event_id)
end

#has_event?(event_id) ⇒ Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/ruby_event_store/in_memory_repository.rb', line 73

def has_event?(event_id)
  storage.has_key?(event_id)
end

#last_stream_event(stream) ⇒ Object



77
78
79
80
# File 'lib/ruby_event_store/in_memory_repository.rb', line 77

def last_stream_event(stream)
  last_id = event_ids_of_stream(stream).last
  storage.fetch(last_id).deserialize(serializer) if last_id
end


52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/ruby_event_store/in_memory_repository.rb', line 52

def link_to_stream(event_ids, stream, expected_version)
  serialized_records = event_ids.map { |id| read_event(id) }

  with_synchronize(expected_version, stream) do |resolved_version|
    ensure_supported_any_usage(resolved_version, stream)
    unless resolved_version.nil? || last_stream_version(stream).equal?(resolved_version)
      raise WrongExpectedEventVersion
    end

    serialized_records.each_with_index do |serialized_record, index|
      raise EventDuplicatedInStream if has_event_in_stream?(serialized_record.event_id, stream.name)
      add_to_stream(stream, serialized_record, resolved_version, index)
    end
  end
  self
end

#position_in_stream(event_id, stream) ⇒ Object



129
130
131
132
133
# File 'lib/ruby_event_store/in_memory_repository.rb', line 129

def position_in_stream(event_id, stream)
  event_in_stream = streams[stream.name].find { |event_in_stream| event_in_stream.event_id.eql?(event_id) }
  raise EventNotFoundInStream if event_in_stream.nil?
  event_in_stream.position
end

#read(spec) ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/ruby_event_store/in_memory_repository.rb', line 82

def read(spec)
  serialized_records = read_scope(spec)
  if spec.batched?
    batch_reader = ->(offset, limit) do
      serialized_records
        .drop(offset)
        .take(limit)
        .map { |serialized_record| serialized_record.deserialize(serializer) }
    end
    BatchEnumerator.new(spec.batch_size, serialized_records.size, batch_reader).each
  elsif spec.first?
    serialized_records.first&.deserialize(serializer)
  elsif spec.last?
    serialized_records.last&.deserialize(serializer)
  else
    Enumerator.new do |y|
      serialized_records.each { |serialized_record| y << serialized_record.deserialize(serializer) }
    end
  end
end

#streams_of(event_id) ⇒ Object



125
126
127
# File 'lib/ruby_event_store/in_memory_repository.rb', line 125

def streams_of(event_id)
  streams.select { |name,| has_event_in_stream?(event_id, name) }.map { |name,| Stream.new(name) }
end

#update_messages(records) ⇒ Object



107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/ruby_event_store/in_memory_repository.rb', line 107

def update_messages(records)
  records.each do |record|
    read_event(record.event_id)
    serialized_record =
      Record
        .new(
          event_id: record.event_id,
          event_type: record.event_type,
          data: record.data,
          metadata: record.,
          timestamp: Time.iso8601(storage.fetch(record.event_id).timestamp),
          valid_at: record.valid_at
        )
        .serialize(serializer)
    storage[record.event_id] = serialized_record
  end
end