Class: Akasha::Repository

Inherits:
Object
  • Object
show all
Defined in:
lib/akasha/repository.rb

Overview

Aggregate repository. Not meant to be used directly (see aggregate/syntax_helpers.rb) See specs for usage.

Constant Summary collapse

STREAM_NAME_SEP =
'-'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store, namespace: nil) ⇒ Repository

Creates a new repository using the underlying ‘store` (e.g. `MemoryEventStore`).

  • namespace - optional namespace allowing for multiple applications to share the same Eventstore

    database without name conflicts
    


13
14
15
16
17
# File 'lib/akasha/repository.rb', line 13

def initialize(store, namespace: nil)
  @store = store
  @subscribers = []
  @namespace = namespace
end

Instance Attribute Details

#namespaceObject (readonly)

Returns the value of attribute namespace.



6
7
8
# File 'lib/akasha/repository.rb', line 6

def namespace
  @namespace
end

#storeObject (readonly)

Returns the value of attribute store.



6
7
8
# File 'lib/akasha/repository.rb', line 6

def store
  @store
end

Instance Method Details

#load_aggregate(klass, id) ⇒ Object

Loads an aggregate identified by ‘id` and `klass` from the repository. Returns an aggregate instance of class `klass` constructed by applying events from the corresponding stream.



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/akasha/repository.rb', line 22

def load_aggregate(klass, id)
  agg = klass.new(id)

  start = 0
  page_size = 20
  stream(klass, id).read_events(start, page_size) do |events|
    agg.apply_events(events)
  end

  agg
end

#merge_all_by_event(into:, only:) ⇒ Object

Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.

Arguments:

`into` - name of the new stream
`only` - array of event names


61
62
63
# File 'lib/akasha/repository.rb', line 61

def merge_all_by_event(into:, only:)
  @store.merge_all_by_event(into: into, only: only, namespace: @namespace)
end

#save_aggregate(aggregate, concurrency: :none) ⇒ Object

Saves an aggregate to the repository, appending events to the corresponding stream.



35
36
37
38
39
40
41
# File 'lib/akasha/repository.rb', line 35

def save_aggregate(aggregate, concurrency: :none)
  changeset = aggregate.changeset
  events = changeset.events.map { |event| event.(namespace: @namespace) }
  revision = aggregate.revision if concurrency == :optimistic
  stream(aggregate.class, changeset.aggregate_id).write_events(events, revision: revision)
  notify_subscribers(changeset.aggregate_id, events)
end

#subscribe(lambda = nil, &block) ⇒ Object

Subscribes to event streams passing either a lambda or a block. Example:

repo.subscribe do |aggregate_id, event|
  ... handle the event ...
end


49
50
51
52
# File 'lib/akasha/repository.rb', line 49

def subscribe(lambda = nil, &block)
  callable = lambda || block
  @subscribers << callable
end