Class: Synapse::EventSourcing::EventCountSnapshotTrigger

Inherits:
EventStreamDecorator show all
Defined in:
lib/synapse/event_sourcing/snapshot/count_trigger.rb

Overview

Snapshot trigger that counts the number of events between snapshots to decide when to schedule the next snapshot

Constant Summary collapse

DEFAULT_THRESHOLD =

Default threshold for a snapshot to be scheduled

Returns:

  • (Integer)
50

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(snapshot_taker, unit_provider) ⇒ undefined

Parameters:

  • snapshot_taker (SnapshotTaker)
  • unit_provider (UnitOfWorkProvider)


24
25
26
27
28
29
30
31
32
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 24

def initialize(snapshot_taker, unit_provider)
  @counters = Hash.new
  @lock = Mutex.new
  @logger = Logging.logger[self.class]
  @threshold = DEFAULT_THRESHOLD

  @snapshot_taker = snapshot_taker
  @unit_provider = unit_provider
end

Instance Attribute Details

#snapshot_takerSnapshotTaker (readonly)

Returns:



13
14
15
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 13

def snapshot_taker
  @snapshot_taker
end

#thresholdInteger

Returns The number of events between snapshots.

Returns:

  • (Integer)

    The number of events between snapshots



16
17
18
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 16

def threshold
  @threshold
end

#unit_providerUnitOfWorkProvider (readonly)

Returns:

  • (UnitOfWorkProvider)


19
20
21
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 19

def unit_provider
  @unit_provider
end

Instance Method Details

#clear_counter(aggregate_id) ⇒ undefined

Clears the event counter for the aggregate with the given identifier

Parameters:

  • aggregate_id (Object)

Returns:

  • (undefined)


84
85
86
87
88
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 84

def clear_counter(aggregate_id)
  @lock.synchronize do
    @counters.delete aggregate_id
  end
end

#counter_for(aggregate_id) ⇒ Atomic

Returns the event counter for the aggregate with the given identifier

Parameters:

  • aggregate_id (Object)

Returns:

  • (Atomic)


70
71
72
73
74
75
76
77
78
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 70

def counter_for(aggregate_id)
  @lock.synchronize do
    if @counters.has_key? aggregate_id
      @counters.fetch aggregate_id
    else
      @counters.store aggregate_id, Atomic.new(0)
    end
  end
end

#decorate_for_append(type_identifier, aggregate, stream) ⇒ DomainEventStream

Parameters:

  • type_identifier (String)
  • aggregate (AggregateRoot)
  • stream (DomainEventStream)

Returns:

  • (DomainEventStream)


62
63
64
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 62

def decorate_for_append(type_identifier, aggregate, stream)
  TriggeringEventStream.new(stream, counter_for(aggregate.id), type_identifier, aggregate.id, self)
end

#decorate_for_read(type_identifier, aggregate_id, stream) ⇒ DomainEventStream

Parameters:

  • type_identifier (String)
  • aggregate_id (Object)
  • stream (DomainEventStream)

Returns:

  • (DomainEventStream)


54
55
56
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 54

def decorate_for_read(type_identifier, aggregate_id, stream)
  CountingEventStream.new(stream, counter_for(aggregate_id))
end

#trigger_snapshot(type_identifier, aggregate_id, counter) ⇒ undefined

If the event threshold has been reached for the aggregate with the given identifier, this will cause a snapshot to be scheduled

Parameters:

  • type_identifier (String)
  • aggregate_id (Object)
  • counter (Atomic)

Returns:

  • (undefined)


41
42
43
44
45
46
47
48
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 41

def trigger_snapshot(type_identifier, aggregate_id, counter)
  if counter.value > @threshold
    @logger.info 'Snapshot threshold reached for [%s] [%s]' % [type_identifier, aggregate_id]

    @snapshot_taker.schedule_snapshot type_identifier, aggregate_id
    counter.value = 1
  end
end