Class: EventStore::Aggregate

Inherits:
Object
  • Object
show all
Defined in:
lib/event_store/aggregate.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, type = EventStore.table_name) ⇒ Aggregate



14
15
16
17
18
19
20
21
# File 'lib/event_store/aggregate.rb', line 14

def initialize(id, type = EventStore.table_name)
  @id = id
  @type = type
  @schema = EventStore.schema
  @event_table = EventStore.fully_qualified_table
  @snapshot_table = "#{@type}_snapshots_for_#{@id}"
  @snapshot_version_table = "#{@type}_snapshot_versions_for_#{@id}"
end

Instance Attribute Details

#event_tableObject (readonly)

Returns the value of attribute event_table.



4
5
6
# File 'lib/event_store/aggregate.rb', line 4

def event_table
  @event_table
end

#idObject (readonly)

Returns the value of attribute id.



4
5
6
# File 'lib/event_store/aggregate.rb', line 4

def id
  @id
end

#snapshot_tableObject (readonly)

Returns the value of attribute snapshot_table.



4
5
6
# File 'lib/event_store/aggregate.rb', line 4

def snapshot_table
  @snapshot_table
end

#snapshot_version_tableObject (readonly)

Returns the value of attribute snapshot_version_table.



4
5
6
# File 'lib/event_store/aggregate.rb', line 4

def snapshot_version_table
  @snapshot_version_table
end

#typeObject (readonly)

Returns the value of attribute type.



4
5
6
# File 'lib/event_store/aggregate.rb', line 4

def type
  @type
end

Class Method Details

.countObject



6
7
8
# File 'lib/event_store/aggregate.rb', line 6

def self.count
  EventStore.db.from( EventStore.fully_qualified_table).distinct(:aggregate_id).count
end

.ids(offset, limit) ⇒ Object



10
11
12
# File 'lib/event_store/aggregate.rb', line 10

def self.ids(offset, limit)
  EventStore.db.from( EventStore.fully_qualified_table).distinct(:aggregate_id).select(:aggregate_id).order(:aggregate_id).limit(limit, offset).all.map{|item| item[:aggregate_id]}
end

Instance Method Details

#delete_events!Object



73
74
75
# File 'lib/event_store/aggregate.rb', line 73

def delete_events!
  events.delete
end

#delete_snapshot!Object



69
70
71
# File 'lib/event_store/aggregate.rb', line 69

def delete_snapshot!
  EventStore.redis.del [@snapshot_table, @snapshot_version_table]
end

#event_streamObject



57
58
59
# File 'lib/event_store/aggregate.rb', line 57

def event_stream
  events.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e}
end

#event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object



51
52
53
54
55
# File 'lib/event_store/aggregate.rb', line 51

def event_stream_between(start_time, end_time, fully_qualified_names = [])
  query = events.where(occurred_at: start_time..end_time)
  query = query.where(fully_qualified_name: fully_qualified_names) if fully_qualified_names && fully_qualified_names.any?
  query.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e}
end

#eventsObject



23
24
25
# File 'lib/event_store/aggregate.rb', line 23

def events
  @events_query ||= EventStore.db.from(@event_table).where(:aggregate_id => @id.to_s).order(:version)
end

#events_from(version_number, max = nil) ⇒ Object



47
48
49
# File 'lib/event_store/aggregate.rb', line 47

def events_from(version_number, max = nil)
  events.limit(max).where{ version >= version_number.to_i }.all
end

#last_eventObject



61
62
63
# File 'lib/event_store/aggregate.rb', line 61

def last_event
  snapshot.last
end

#rebuild_snapshot!Object



41
42
43
44
45
# File 'lib/event_store/aggregate.rb', line 41

def rebuild_snapshot!
  delete_snapshot!
  corrected_events = events.all.map{|e| e[:occurred_at] = TimeHacker.translate_occurred_at_from_local_to_gmt(e[:occurred_at]); e}
  EventAppender.new(self).store_snapshot(corrected_events)
end

#snapshotObject



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/event_store/aggregate.rb', line 27

def snapshot
  events_hash = auto_rebuild_snapshot(read_raw_snapshot)
  snap = []
  events_hash.each_pair do |key, value|
    raw_event            = value.split(EventStore::SNAPSHOT_DELIMITER)
    fully_qualified_name = key
    version              = raw_event.first.to_i
    serialized_event     = EventStore.unescape_bytea(raw_event[1])
    occurred_at          = Time.parse(raw_event.last)
    snap << SerializedEvent.new(fully_qualified_name, serialized_event, version, occurred_at)
  end
  snap.sort {|a,b| a.version <=> b.version}
end

#versionObject



65
66
67
# File 'lib/event_store/aggregate.rb', line 65

def version
  (EventStore.redis.hget(@snapshot_version_table, :current_version) || -1).to_i
end