Class: Akasha::Repository

Inherits:
Object
  • Object
show all
Defined in:
lib/akasha/repository.rb

Overview

Aggregate repository. Not meant to be used directly (see aggregate/syntax_helpers.rb) See specs for usage.

Constant Summary collapse

STREAM_NAME_SEP =
'-'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(store, namespace: nil) ⇒ Repository

Creates a new repository using the underlying store (e.g. MemoryEventStore).

  • namespace - optional namespace allowing for multiple applications to share the same Eventstore

    database without name conflicts
    


13
14
15
16
17
# File 'lib/akasha/repository.rb', line 13

def initialize(store, namespace: nil)
  @store = store
  @subscribers = []
  @namespace = namespace
end

Instance Attribute Details

#namespaceObject (readonly)

Returns the value of attribute namespace.



6
7
8
# File 'lib/akasha/repository.rb', line 6

def namespace
  @namespace
end

#storeObject (readonly)

Returns the value of attribute store.



6
7
8
# File 'lib/akasha/repository.rb', line 6

def store
  @store
end

Instance Method Details

#load_aggregate(klass, id) ⇒ Object

Loads an aggregate identified by id and klass from the repository. Returns an aggregate instance of class klass constructed by applying events from the corresponding stream.



22
23
24
25
26
27
28
29
30
31
32
# File 'lib/akasha/repository.rb', line 22

def load_aggregate(klass, id)
  agg = klass.new(id)

  start = 0
  page_size = 20
  stream(klass, id).read_events(start, page_size) do |events|
    agg.apply_events(events)
  end

  agg
end

#merge_all_by_event(into:, only:) ⇒ Object

Merges all streams into one, filtering the resulting stream so it only contains events with the specified names, using a projection.

Arguments:

`into` - name of the new stream
`only` - array of event names


61
62
63
# File 'lib/akasha/repository.rb', line 61

def merge_all_by_event(into:, only:)
  @store.merge_all_by_event(into: into, only: only, namespace: @namespace)
end

#save_aggregate(aggregate, concurrency: :none) ⇒ Object

Saves an aggregate to the repository, appending events to the corresponding stream.



35
36
37
38
39
40
41
# File 'lib/akasha/repository.rb', line 35

def save_aggregate(aggregate, concurrency: :none)
  changeset = aggregate.changeset
  events = changeset.events.map { |event| event.(namespace: @namespace) }
  revision = aggregate.revision if concurrency == :optimistic
  stream(aggregate.class, changeset.aggregate_id).write_events(events, revision: revision)
  notify_subscribers(changeset.aggregate_id, events)
end

#subscribe(lambda = nil, &block) ⇒ Object

Subscribes to event streams passing either a lambda or a block. Example:

repo.subscribe do |aggregate_id, event|
  ... handle the event ...
end


49
50
51
52
# File 'lib/akasha/repository.rb', line 49

def subscribe(lambda = nil, &block)
  callable = lambda || block
  @subscribers << callable
end