Class: EventStore::Aggregate
- Inherits:
-
Object
- Object
- EventStore::Aggregate
- Defined in:
- lib/event_store/aggregate.rb
Instance Attribute Summary collapse
-
#event_table ⇒ Object
readonly
Returns the value of attribute event_table.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#snapshot_table ⇒ Object
readonly
Returns the value of attribute snapshot_table.
-
#snapshot_version_table ⇒ Object
readonly
Returns the value of attribute snapshot_version_table.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
Class Method Summary collapse
Instance Method Summary collapse
- #delete_events! ⇒ Object
- #delete_snapshot! ⇒ Object
- #event_stream ⇒ Object
- #event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object
- #events ⇒ Object
- #events_from(version_number, max = nil) ⇒ Object
-
#initialize(id, type = EventStore.table_name) ⇒ Aggregate
constructor
A new instance of Aggregate.
- #last_event ⇒ Object
- #rebuild_snapshot! ⇒ Object
- #snapshot ⇒ Object
- #version ⇒ Object
Constructor Details
#initialize(id, type = EventStore.table_name) ⇒ Aggregate
14 15 16 17 18 19 20 21 |
# File 'lib/event_store/aggregate.rb', line 14 def initialize(id, type = EventStore.table_name) @id = id @type = type @schema = EventStore.schema @event_table = EventStore.fully_qualified_table @snapshot_table = "#{@type}_snapshots_for_#{@id}" @snapshot_version_table = "#{@type}_snapshot_versions_for_#{@id}" end |
Instance Attribute Details
#event_table ⇒ Object (readonly)
Returns the value of attribute event_table.
4 5 6 |
# File 'lib/event_store/aggregate.rb', line 4 def event_table @event_table end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
4 5 6 |
# File 'lib/event_store/aggregate.rb', line 4 def id @id end |
#snapshot_table ⇒ Object (readonly)
Returns the value of attribute snapshot_table.
4 5 6 |
# File 'lib/event_store/aggregate.rb', line 4 def snapshot_table @snapshot_table end |
#snapshot_version_table ⇒ Object (readonly)
Returns the value of attribute snapshot_version_table.
4 5 6 |
# File 'lib/event_store/aggregate.rb', line 4 def snapshot_version_table @snapshot_version_table end |
#type ⇒ Object (readonly)
Returns the value of attribute type.
4 5 6 |
# File 'lib/event_store/aggregate.rb', line 4 def type @type end |
Class Method Details
.count ⇒ Object
6 7 8 |
# File 'lib/event_store/aggregate.rb', line 6 def self.count EventStore.db.from( EventStore.fully_qualified_table).distinct(:aggregate_id).count end |
.ids(offset, limit) ⇒ Object
10 11 12 |
# File 'lib/event_store/aggregate.rb', line 10 def self.ids(offset, limit) EventStore.db.from( EventStore.fully_qualified_table).distinct(:aggregate_id).select(:aggregate_id).order(:aggregate_id).limit(limit, offset).all.map{|item| item[:aggregate_id]} end |
Instance Method Details
#delete_events! ⇒ Object
73 74 75 |
# File 'lib/event_store/aggregate.rb', line 73 def delete_events! events.delete end |
#delete_snapshot! ⇒ Object
69 70 71 |
# File 'lib/event_store/aggregate.rb', line 69 def delete_snapshot! EventStore.redis.del [@snapshot_table, @snapshot_version_table] end |
#event_stream ⇒ Object
57 58 59 |
# File 'lib/event_store/aggregate.rb', line 57 def event_stream events.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e} end |
#event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object
51 52 53 54 55 |
# File 'lib/event_store/aggregate.rb', line 51 def event_stream_between(start_time, end_time, fully_qualified_names = []) query = events.where(occurred_at: start_time..end_time) query = query.where(fully_qualified_name: fully_qualified_names) if fully_qualified_names && fully_qualified_names.any? query.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e} end |
#events ⇒ Object
23 24 25 |
# File 'lib/event_store/aggregate.rb', line 23 def events @events_query ||= EventStore.db.from(@event_table).where(:aggregate_id => @id.to_s).order(:version) end |
#events_from(version_number, max = nil) ⇒ Object
47 48 49 |
# File 'lib/event_store/aggregate.rb', line 47 def events_from(version_number, max = nil) events.limit(max).where{ version >= version_number.to_i }.all end |
#last_event ⇒ Object
61 62 63 |
# File 'lib/event_store/aggregate.rb', line 61 def last_event snapshot.last end |
#rebuild_snapshot! ⇒ Object
41 42 43 44 45 |
# File 'lib/event_store/aggregate.rb', line 41 def rebuild_snapshot! delete_snapshot! corrected_events = events.all.map{|e| e[:occurred_at] = TimeHacker.translate_occurred_at_from_local_to_gmt(e[:occurred_at]); e} EventAppender.new(self).store_snapshot(corrected_events) end |
#snapshot ⇒ Object
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/event_store/aggregate.rb', line 27 def snapshot events_hash = auto_rebuild_snapshot(read_raw_snapshot) snap = [] events_hash.each_pair do |key, value| raw_event = value.split(EventStore::SNAPSHOT_DELIMITER) fully_qualified_name = key version = raw_event.first.to_i serialized_event = EventStore.unescape_bytea(raw_event[1]) occurred_at = Time.parse(raw_event.last) snap << SerializedEvent.new(fully_qualified_name, serialized_event, version, occurred_at) end snap.sort {|a,b| a.version <=> b.version} end |
#version ⇒ Object
65 66 67 |
# File 'lib/event_store/aggregate.rb', line 65 def version (EventStore.redis.hget(@snapshot_version_table, :current_version) || -1).to_i end |