Class: Akasha::Repository
- Inherits:
-
Object
- Object
- Akasha::Repository
- Defined in:
- lib/akasha/repository.rb
Overview
Aggregate repository. Not meant to be used directly (see aggregate/syntax_helpers.rb) See specs for usage.
Constant Summary collapse
- STREAM_NAME_SEP =
'-'.freeze
Instance Attribute Summary collapse
-
#namespace ⇒ Object
readonly
Returns the value of attribute namespace.
-
#store ⇒ Object
readonly
Returns the value of attribute store.
Instance Method Summary collapse
-
#initialize(store, namespace: nil) ⇒ Repository
constructor
Creates a new repository using the underlying
store(e.g.MemoryEventStore). -
#load_aggregate(klass, id) ⇒ Object
Loads an aggregate identified by
idandklassfrom the repository. -
#merge_all_by_event(into:, only:) ⇒ Object
Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.
-
#save_aggregate(aggregate, concurrency: :none) ⇒ Object
Saves an aggregate to the repository, appending events to the corresponding stream.
-
#subscribe(lambda = nil, &block) ⇒ Object
Subscribes to event streams passing either a lambda or a block.
Constructor Details
#initialize(store, namespace: nil) ⇒ Repository
Creates a new repository using the underlying store (e.g. MemoryEventStore).
-
namespace - optional namespace allowing for multiple applications to share the same Eventstore
database without name conflicts
13 14 15 16 17 |
# File 'lib/akasha/repository.rb', line 13 def initialize(store, namespace: nil) @store = store @subscribers = [] @namespace = namespace end |
Instance Attribute Details
#namespace ⇒ Object (readonly)
Returns the value of attribute namespace.
6 7 8 |
# File 'lib/akasha/repository.rb', line 6 def namespace @namespace end |
#store ⇒ Object (readonly)
Returns the value of attribute store.
6 7 8 |
# File 'lib/akasha/repository.rb', line 6 def store @store end |
Instance Method Details
#load_aggregate(klass, id) ⇒ Object
Loads an aggregate identified by id and klass from the repository. Returns an aggregate instance of class klass constructed by applying events from the corresponding stream.
22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/akasha/repository.rb', line 22 def load_aggregate(klass, id) agg = klass.new(id) start = 0 page_size = 20 stream(klass, id).read_events(start, page_size) do |events| agg.apply_events(events) end agg end |
#merge_all_by_event(into:, only:) ⇒ Object
Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.
Arguments:
`into` - name of the new stream
`only` - array of event names
61 62 63 |
# File 'lib/akasha/repository.rb', line 61 def merge_all_by_event(into:, only:) @store.merge_all_by_event(into: into, only: only, namespace: @namespace) end |
#save_aggregate(aggregate, concurrency: :none) ⇒ Object
Saves an aggregate to the repository, appending events to the corresponding stream.
35 36 37 38 39 40 41 |
# File 'lib/akasha/repository.rb', line 35 def save_aggregate(aggregate, concurrency: :none) changeset = aggregate.changeset events = changeset.events.map { |event| event.(namespace: @namespace) } revision = aggregate.revision if concurrency == :optimistic stream(aggregate.class, changeset.aggregate_id).write_events(events, revision: revision) notify_subscribers(changeset.aggregate_id, events) end |
#subscribe(lambda = nil, &block) ⇒ Object
Subscribes to event streams passing either a lambda or a block. Example:
repo.subscribe do |aggregate_id, event|
... handle the event ...
end
49 50 51 52 |
# File 'lib/akasha/repository.rb', line 49 def subscribe(lambda = nil, &block) callable = lambda || block @subscribers << callable end |