Class: Synapse::UnitOfWork::UnitOfWork

Inherits:
NestableUnitOfWork show all
Defined in:
lib/synapse/uow/uow.rb

Overview

Default implementation of a unit of work

Instance Method Summary collapse

Methods inherited from NestableUnitOfWork

#commit, #rollback, #start, #started?

Constructor Details

#initialize(provider) ⇒ undefined

Parameters:



7
8
9
10
11
12
13
# File 'lib/synapse/uow/uow.rb', line 7

def initialize(provider)
  super

  @aggregates = Hash.new
  @events = Hash.new
  @listeners = UnitOfWorkListenerCollection.new
end

Instance Method Details

#publish_event(event, event_bus) ⇒ EventMessage

Buffers an event for publication to the given event bus until this unit of work is committed

Parameters:

  • event (EventMessage)
  • event_bus (EventBus)

Returns:

  • (EventMessage)

    The event that will be published to the event bus



70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/synapse/uow/uow.rb', line 70

def publish_event(event, event_bus)
  event = @listeners.on_event_registered self, event
  event.tap do
    begin
      events = @events.fetch event_bus
    rescue KeyError
      events = @events.store event_bus, Array.new
    end

    events.push event
  end
end

#register_aggregate(aggregate, event_bus, &storage_callback) {|AggregateRoot| ... } ⇒ AggregateRoot

Registers an aggregate with this unit of work

This unit of work adds an event listener to the aggregate so that any events generated are published to the given event bus when this unit of work is committed.

The provided storage callback is used to commit the aggregate to its respective underlying storage mechanism.

If there is already an aggregate registered with this unit of work of the same type and with the same identifier, that aggregate will be returned instead of the given aggregate.

Parameters:

  • aggregate (AggregateRoot)
  • event_bus (EventBus)
  • storage_callback (Proc)

Yields:

  • (AggregateRoot)

    Deferred until the aggregate is stored

Returns:

  • (AggregateRoot)


50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/synapse/uow/uow.rb', line 50

def register_aggregate(aggregate, event_bus, &storage_callback)
  similar = find_similar_aggregate aggregate
  return similar if similar

  aggregate.add_registration_listener do |event|
    publish_event event, event_bus
  end

  @aggregates.store aggregate, storage_callback

  aggregate
end

#register_listener(listener) ⇒ undefined

Registers a listener that is notified of state changes in this unit of work

Parameters:

Returns:

  • (undefined)


28
29
30
# File 'lib/synapse/uow/uow.rb', line 28

def register_listener(listener)
  @listeners.push listener
end

#transaction_manager=(transaction_manager) ⇒ undefined

Sets the transaction manager that will be used by this unit of work

Parameters:

Returns:

  • (undefined)

Raises:

  • (RuntimeError)

    If unit of work has been started



89
90
91
92
93
94
95
# File 'lib/synapse/uow/uow.rb', line 89

def transaction_manager=(transaction_manager)
  if started?
    raise 'Transaction manager not permitted to change after unit of work has started'
  end

  @transaction_manager = transaction_manager
end

#transactional?Boolean

Returns true if this unit of work is bound to a transaction

Returns:

  • (Boolean)


19
20
21
# File 'lib/synapse/uow/uow.rb', line 19

def transactional?
  !!@transaction_manager
end