Class: RubyEventStore::ROM::EventRepository
- Inherits:
-
Object
- Object
- RubyEventStore::ROM::EventRepository
- Defined in:
- lib/ruby_event_store/rom/event_repository.rb
Instance Method Summary collapse
- #append_to_stream(records, stream, expected_version) ⇒ Object
- #count(specification) ⇒ Object
- #delete_stream(stream) ⇒ Object
- #event_in_stream?(event_id, stream) ⇒ Boolean
- #global_position(event_id) ⇒ Object
- #has_event?(event_id) ⇒ Boolean
-
#initialize(rom:, serializer:) ⇒ EventRepository
constructor
A new instance of EventRepository.
- #last_stream_event(stream) ⇒ Object
- #link_to_stream(event_ids, stream, expected_version) ⇒ Object
- #position_in_stream(event_id, stream) ⇒ Object
- #read(specification) ⇒ Object
- #streams_of(event_id) ⇒ Object
- #update_messages(records) ⇒ Object
Constructor Details
#initialize(rom:, serializer:) ⇒ EventRepository
Returns a new instance of EventRepository.
6 7 8 9 10 11 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 6 def initialize(rom:, serializer:) @serializer = serializer @events = Repositories::Events.new(rom) @stream_entries = Repositories::StreamEntries.new(rom) @unit_of_work = UnitOfWork.new(rom.gateways.fetch(:default)) end |
Instance Method Details
#append_to_stream(records, stream, expected_version) ⇒ Object
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 13 def append_to_stream(records, stream, expected_version) serialized_records = records.map { |record| record.serialize(@serializer) } event_ids = records.map(&:event_id) handle_unique_violation do @unit_of_work.call do |changesets| changesets << @events.create_changeset(serialized_records) changesets << @stream_entries.create_changeset( event_ids, stream, @stream_entries.resolve_version(stream, expected_version) ) end end self end |
#count(specification) ⇒ Object
80 81 82 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 80 def count(specification) @events.count(specification) end |
#delete_stream(stream) ⇒ Object
61 62 63 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 61 def delete_stream(stream) @stream_entries.delete(stream) end |
#event_in_stream?(event_id, stream) ⇒ Boolean
57 58 59 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 57 def event_in_stream?(event_id, stream) @stream_entries.event_in_stream?(event_id, stream) end |
#global_position(event_id) ⇒ Object
53 54 55 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 53 def global_position(event_id) @events.global_position(event_id) end |
#has_event?(event_id) ⇒ Boolean
65 66 67 68 69 70 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 65 def has_event?(event_id) @events.exist?(event_id) rescue Sequel::DatabaseError => doh raise doh unless doh. =~ /PG::InvalidTextRepresentation.*uuid/ false end |
#last_stream_event(stream) ⇒ Object
72 73 74 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 72 def last_stream_event(stream) @events.last_stream_event(stream, @serializer) end |
#link_to_stream(event_ids, stream, expected_version) ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 32 def link_to_stream(event_ids, stream, expected_version) validate_event_ids(event_ids) handle_unique_violation do @unit_of_work.call do |changesets| changesets << @stream_entries.create_changeset( event_ids, stream, @stream_entries.resolve_version(stream, expected_version) ) end end self end |
#position_in_stream(event_id, stream) ⇒ Object
49 50 51 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 49 def position_in_stream(event_id, stream) @stream_entries.position_in_stream(event_id, stream) end |
#read(specification) ⇒ Object
76 77 78 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 76 def read(specification) @events.read(specification, @serializer) end |
#streams_of(event_id) ⇒ Object
93 94 95 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 93 def streams_of(event_id) @stream_entries.streams_of(event_id).map { |name| Stream.new(name) } end |
#update_messages(records) ⇒ Object
84 85 86 87 88 89 90 91 |
# File 'lib/ruby_event_store/rom/event_repository.rb', line 84 def (records) validate_event_ids(records.map(&:event_id)) @unit_of_work.call do |changesets| serialized_records = records.map { |record| record.serialize(@serializer) } changesets << @events.update_changeset(serialized_records) end end |