Class: Synapse::EventSourcing::EventCountSnapshotTrigger
- Inherits:
-
EventStreamDecorator
- Object
- EventStreamDecorator
- Synapse::EventSourcing::EventCountSnapshotTrigger
- Defined in:
- lib/synapse/event_sourcing/snapshot/count_trigger.rb
Overview
Snapshot trigger that counts the number of events between snapshots to decide when to schedule the next snapshot
Constant Summary collapse
- DEFAULT_THRESHOLD =
Default threshold for a snapshot to be scheduled
50
Instance Attribute Summary collapse
- #snapshot_taker ⇒ SnapshotTaker readonly
-
#threshold ⇒ Integer
The number of events between snapshots.
- #unit_provider ⇒ UnitOfWorkProvider readonly
Instance Method Summary collapse
-
#clear_counter(aggregate_id) ⇒ undefined
Clears the event counter for the aggregate with the given identifier.
-
#counter_for(aggregate_id) ⇒ Atomic
Returns the event counter for the aggregate with the given identifier.
- #decorate_for_append(type_identifier, aggregate, stream) ⇒ DomainEventStream
- #decorate_for_read(type_identifier, aggregate_id, stream) ⇒ DomainEventStream
- #initialize(snapshot_taker, unit_provider) ⇒ undefined constructor
-
#trigger_snapshot(type_identifier, aggregate_id, counter) ⇒ undefined
If the event threshold has been reached for the aggregate with the given identifier, this will cause a snapshot to be scheduled.
Constructor Details
#initialize(snapshot_taker, unit_provider) ⇒ undefined
24 25 26 27 28 29 30 31 32 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 24 def initialize(snapshot_taker, unit_provider) @counters = Hash.new @lock = Mutex.new @logger = Logging.logger[self.class] @threshold = DEFAULT_THRESHOLD @snapshot_taker = snapshot_taker @unit_provider = unit_provider end |
Instance Attribute Details
#snapshot_taker ⇒ SnapshotTaker (readonly)
13 14 15 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 13 def snapshot_taker @snapshot_taker end |
#threshold ⇒ Integer
Returns The number of events between snapshots.
16 17 18 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 16 def threshold @threshold end |
#unit_provider ⇒ UnitOfWorkProvider (readonly)
19 20 21 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 19 def unit_provider @unit_provider end |
Instance Method Details
#clear_counter(aggregate_id) ⇒ undefined
Clears the event counter for the aggregate with the given identifier
84 85 86 87 88 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 84 def clear_counter(aggregate_id) @lock.synchronize do @counters.delete aggregate_id end end |
#counter_for(aggregate_id) ⇒ Atomic
Returns the event counter for the aggregate with the given identifier
70 71 72 73 74 75 76 77 78 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 70 def counter_for(aggregate_id) @lock.synchronize do if @counters.has_key? aggregate_id @counters.fetch aggregate_id else @counters.store aggregate_id, Atomic.new(0) end end end |
#decorate_for_append(type_identifier, aggregate, stream) ⇒ DomainEventStream
62 63 64 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 62 def decorate_for_append(type_identifier, aggregate, stream) TriggeringEventStream.new(stream, counter_for(aggregate.id), type_identifier, aggregate.id, self) end |
#decorate_for_read(type_identifier, aggregate_id, stream) ⇒ DomainEventStream
54 55 56 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 54 def decorate_for_read(type_identifier, aggregate_id, stream) CountingEventStream.new(stream, counter_for(aggregate_id)) end |
#trigger_snapshot(type_identifier, aggregate_id, counter) ⇒ undefined
If the event threshold has been reached for the aggregate with the given identifier, this will cause a snapshot to be scheduled
41 42 43 44 45 46 47 48 |
# File 'lib/synapse/event_sourcing/snapshot/count_trigger.rb', line 41 def trigger_snapshot(type_identifier, aggregate_id, counter) if counter.value > @threshold @logger.info 'Snapshot threshold reached for [%s] [%s]' % [type_identifier, aggregate_id] @snapshot_taker.schedule_snapshot type_identifier, aggregate_id counter.value = 1 end end |