Class: EventStore::EventStream
- Inherits:
-
Object
- Object
- EventStore::EventStream
- Includes:
- Enumerable
- Defined in:
- lib/event_store/event_stream.rb
Instance Attribute Summary collapse
-
#checkpoint_events ⇒ Object
readonly
Returns the value of attribute checkpoint_events.
-
#event_table ⇒ Object
readonly
Returns the value of attribute event_table.
Instance Method Summary collapse
- #append(raw_events, logger) {|prepared_events| ... } ⇒ Object
- #delete_events! ⇒ Object
- #each ⇒ Object
- #empty? ⇒ Boolean
- #event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object
- #events ⇒ Object
- #events_from(event_id, max = nil) ⇒ Object
- #fully_qualified_names ⇒ Object
-
#initialize(aggregate) ⇒ EventStream
constructor
A new instance of EventStream.
- #insert_table(occurred_at) ⇒ Object
- #insert_table_name(date) ⇒ Object
- #last ⇒ Object
-
#last_event_before(start_time, fully_qualified_names = []) ⇒ Object
Private: returns the last event before start_time for each of the events named by fully_qualified_names.
-
#simple_last_event_before(start_time, fully_qualified_names = []) ⇒ Object
Private: returns the last event before start_time for each of the events named by fully_qualified_names.
- #snapshot_events ⇒ Object
Constructor Details
#initialize(aggregate) ⇒ EventStream
Returns a new instance of EventStream.
7 8 9 10 11 12 13 14 15 |
# File 'lib/event_store/event_stream.rb', line 7 def initialize aggregate @aggregate = aggregate @id = @aggregate.id @checkpoint_events = aggregate.checkpoint_events @event_table_alias = "events" @event_table = Sequel.qualify(EventStore.schema, EventStore.table_name) @aliased_event_table = event_table.as(@event_table_alias) @names_table = EventStore.fully_qualified_names_table end |
Instance Attribute Details
#checkpoint_events ⇒ Object (readonly)
Returns the value of attribute checkpoint_events.
5 6 7 |
# File 'lib/event_store/event_stream.rb', line 5 def checkpoint_events @checkpoint_events end |
#event_table ⇒ Object (readonly)
Returns the value of attribute event_table.
5 6 7 |
# File 'lib/event_store/event_stream.rb', line 5 def event_table @event_table end |
Instance Method Details
#append(raw_events, logger) {|prepared_events| ... } ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/event_store/event_stream.rb', line 17 def append(raw_events, logger) prepared_events = raw_events.map do |raw_event| event = prepare_event(raw_event) ensure_all_attributes_have_values!(event) event end prepared_events.each do |event| event_hash = event.dup.reject! { |k,v| k == :fully_qualified_name } event_table = insert_table(Time.now) begin id = event_table.insert(event_hash) rescue Sequel::NotNullConstraintViolation fully_qualified_names.insert(fully_qualified_name: event[:fully_qualified_name]) id = event_table.insert(event_hash) end logger.debug("EventStream#append, setting id #{id} for #{event_hash.inspect}") event[:id] = id end yield(prepared_events) if block_given? end |
#delete_events! ⇒ Object
160 161 162 |
# File 'lib/event_store/event_stream.rb', line 160 def delete_events! EventStore.db.from(@event_table).where(:aggregate_id => @id.to_s).delete end |
#each ⇒ Object
153 154 155 156 157 158 |
# File 'lib/event_store/event_stream.rb', line 153 def each events.all.each do |e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]) yield e end end |
#empty? ⇒ Boolean
149 150 151 |
# File 'lib/event_store/event_stream.rb', line 149 def empty? events.empty? end |
#event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object
139 140 141 142 143 |
# File 'lib/event_store/event_stream.rb', line 139 def event_stream_between(start_time, end_time, fully_qualified_names = []) query = events.where(occurred_at: start_time..end_time) query = query.where(fully_qualified_name: fully_qualified_names) if fully_qualified_names && fully_qualified_names.any? query.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e} end |
#events ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/event_store/event_stream.rb', line 55 def events @events_query ||= begin query = EventStore.db.from(@aliased_event_table).where(:aggregate_id => @id.to_s) query = query.join(@names_table, id: :fully_qualified_name_id) if EventStore.use_names_table? query = query.order { events[:id] }.select_all(:events) query = query.select_append(:fully_qualified_name) if EventStore.use_names_table? query end end |
#events_from(event_id, max = nil) ⇒ Object
82 83 84 85 86 87 88 |
# File 'lib/event_store/event_stream.rb', line 82 def events_from(event_id, max = nil) # note: this depends on the events table being aliased to "events" above. events.limit(max).where{events[:id] >= event_id.to_i }.all.map do |event| event[:serialized_event] = EventStore.unescape_bytea(event[:serialized_event]) event end end |
#fully_qualified_names ⇒ Object
51 52 53 |
# File 'lib/event_store/event_stream.rb', line 51 def fully_qualified_names @fully_qualified_name_query ||= EventStore.db.from(@names_table) end |
#insert_table(occurred_at) ⇒ Object
43 44 45 |
# File 'lib/event_store/event_stream.rb', line 43 def insert_table(occurred_at) EventStore.db.from(insert_table_name(occurred_at)) end |
#insert_table_name(date) ⇒ Object
47 48 49 |
# File 'lib/event_store/event_stream.rb', line 47 def insert_table_name(date) EventStore.insert_table_name(date) end |
#last ⇒ Object
145 146 147 |
# File 'lib/event_store/event_stream.rb', line 145 def last to_a.last end |
#last_event_before(start_time, fully_qualified_names = []) ⇒ Object
Private: returns the last event before start_time for each of the events named
by fully_qualified_names.
Generates queries that look like this:
SELECT events.*, fully_qualified_name
FROM event_store.thermostat_events "events"
INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id
WHERE events.id IN (SELECT max(events.id) from event_store.thermostat_events "events"
INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id
WHERE occurred_at < '2016-08-08 06:00:00'
AND fully_qualified_name = 'faceplate_api.system.core.events.HeatingStageStarted'
GROUP BY sub_key);
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/event_store/event_stream.rb', line 104 def last_event_before(start_time, fully_qualified_names = []) = start_time.strftime("%Y-%m-%d %H:%M:%S%z") rows = fully_qualified_names.inject([]) { |memo, name| memo + events.where(Sequel.qualify("events", "id") => events.where(fully_qualified_name: name).where { occurred_at < } .select { max(events[:id]) }.unordered.group(:sub_key)).all }.sort_by { |r| r[:occurred_at] } rows.map {|r| r[:serialized_event] = EventStore.unescape_bytea(r[:serialized_event]); r} end |
#simple_last_event_before(start_time, fully_qualified_names = []) ⇒ Object
Private: returns the last event before start_time for each of the events named
by fully_qualified_names. Doesn't work when events have multiple valid
sub_keys, but is fast when they don't.
Generates queries that look like this:
SELECT events.*, fully_qualified_name
FROM event_store.thermostat_events "events"
INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id
WHERE occurred_at < '2016-08-08 06:00:00'
AND fully_qualified_name = 'faceplate_api.system.core.events.HeatingStageStarted'
ORDER BY occurred_at DESC LIMIT 1;
128 129 130 131 132 133 134 135 136 137 |
# File 'lib/event_store/event_stream.rb', line 128 def simple_last_event_before(start_time, fully_qualified_names = []) = start_time.strftime("%Y-%m-%d %H:%M:%S%z") rows = fully_qualified_names.inject([]) { |memo, name| memo + events.where(fully_qualified_name: name).where{ occurred_at < } .reverse_order(:occurred_at, :id).limit(1).all }.sort_by { |r| r[:occurred_at] } rows.map {|r| r[:serialized_event] = EventStore.unescape_bytea(r[:serialized_event]); r} end |
#snapshot_events ⇒ Object
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/event_store/event_stream.rb', line 66 def snapshot_events last_checkpoint = nil if checkpoint_events checkpoints = last_event_before(Time.now.utc, checkpoint_events) last_checkpoint = checkpoints.first # start at the earliest possible place end if last_checkpoint events.where{ events[:id] >= last_checkpoint[:id].to_i } else events end end |