Class: Akasha::Repository
- Inherits:
-
Object
- Object
- Akasha::Repository
- Defined in:
- lib/akasha/repository.rb
Overview
Aggregate repository. Not meant to be used directly (see aggregate/syntax_helpers.rb) See specs for usage.
Constant Summary collapse
- STREAM_NAME_SEP =
'-'.freeze
Instance Attribute Summary collapse
-
#namespace ⇒ Object
readonly
Returns the value of attribute namespace.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
Instance Method Summary collapse
-
#initialize(store, namespace: nil) ⇒ Repository
constructor
Creates a new repository using the underlying ‘store` (e.g. `MemoryEventStore`).
-
#load_aggregate(klass, id) ⇒ Object
Loads an aggregate identified by ‘id` and `klass` from the repository.
-
#merge_all_by_event(into:, only:) ⇒ Object
Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.
-
#save_aggregate(aggregate, concurrency: :none) ⇒ Object
Saves an aggregate to the repository, appending events to the corresponding stream.
-
#subscribe(lambda = nil, &block) ⇒ Object
Subscribes to event streams passing either a lambda or a block.
Constructor Details
#initialize(store, namespace: nil) ⇒ Repository
Creates a new repository using the underlying ‘store` (e.g. `MemoryEventStore`).
-
namespace - optional namespace allowing for multiple applications to share the same Eventstore
database without name conflicts
13 14 15 16 17 |
# File 'lib/akasha/repository.rb', line 13 def initialize(store, namespace: nil) @store = store @subscribers = [] @namespace = namespace end |
Instance Attribute Details
#namespace ⇒ Object (readonly)
Returns the value of attribute namespace.
6 7 8 |
# File 'lib/akasha/repository.rb', line 6 def namespace @namespace end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
6 7 8 |
# File 'lib/akasha/repository.rb', line 6 def store @store end |
Instance Method Details
#load_aggregate(klass, id) ⇒ Object
Loads an aggregate identified by ‘id` and `klass` from the repository. Returns an aggregate instance of class `klass` constructed by applying events from the corresponding stream.
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/akasha/repository.rb', line 22 def load_aggregate(klass, id) agg = klass.new(id) start = 0 page_size = 20 stream(klass, id).read_events(start, page_size) do |events| agg.apply_events(events) end agg end |
#merge_all_by_event(into:, only:) ⇒ Object
Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.
Arguments:
`into` - name of the new stream
`only` - array of event names
61 62 63 |
# File 'lib/akasha/repository.rb', line 61 def merge_all_by_event(into:, only:) @store.merge_all_by_event(into: into, only: only, namespace: @namespace) end |
#save_aggregate(aggregate, concurrency: :none) ⇒ Object
Saves an aggregate to the repository, appending events to the corresponding stream.
35 36 37 38 39 40 41 |
# File 'lib/akasha/repository.rb', line 35 def save_aggregate(aggregate, concurrency: :none) changeset = aggregate.changeset events = changeset.events.map { |event| event.(namespace: @namespace) } revision = aggregate.revision if concurrency == :optimistic stream(aggregate.class, changeset.aggregate_id).write_events(events, revision: revision) notify_subscribers(changeset.aggregate_id, events) end |
#subscribe(lambda = nil, &block) ⇒ Object
Subscribes to event streams passing either a lambda or a block. Example:
repo.subscribe do |aggregate_id, event|
... handle the event ...
end
49 50 51 52 |
# File 'lib/akasha/repository.rb', line 49 def subscribe(lambda = nil, &block) callable = lambda || block @subscribers << callable end |