Class: EventStore::EventStream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/event_store/event_stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(aggregate) ⇒ EventStream

Returns a new instance of EventStream.



7
8
9
10
11
12
13
14
15
# File 'lib/event_store/event_stream.rb', line 7

def initialize aggregate
  @aggregate = aggregate
  @id = @aggregate.id
  @checkpoint_events = aggregate.checkpoint_events
  @event_table_alias = "events"
  @event_table = Sequel.qualify(EventStore.schema, EventStore.table_name)
  @aliased_event_table = event_table.as(@event_table_alias)
  @names_table = EventStore.fully_qualified_names_table
end

Instance Attribute Details

#checkpoint_eventsObject (readonly)

Returns the value of attribute checkpoint_events.



5
6
7
# File 'lib/event_store/event_stream.rb', line 5

def checkpoint_events
  @checkpoint_events
end

#event_tableObject (readonly)

Returns the value of attribute event_table.



5
6
7
# File 'lib/event_store/event_stream.rb', line 5

def event_table
  @event_table
end

Instance Method Details

#append(raw_events, logger) {|prepared_events| ... } ⇒ Object

Yields:

  • (prepared_events)


17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/event_store/event_stream.rb', line 17

def append(raw_events, logger)
  prepared_events = raw_events.map do |raw_event|
    event = prepare_event(raw_event)
    ensure_all_attributes_have_values!(event)
    event
  end

  prepared_events.each do |event|
    event_hash = event.dup.reject! { |k,v| k == :fully_qualified_name }
    event_table = insert_table(Time.now)

    begin
      id = event_table.insert(event_hash)
    rescue Sequel::NotNullConstraintViolation
      fully_qualified_names.insert(fully_qualified_name: event[:fully_qualified_name])
      id = event_table.insert(event_hash)
    end

    logger.debug("EventStream#append, setting id #{id} for #{event_hash.inspect}")

    event[:id] = id
  end

  yield(prepared_events) if block_given?
end

#delete_events!Object



160
161
162
# File 'lib/event_store/event_stream.rb', line 160

def delete_events!
  EventStore.db.from(@event_table).where(:aggregate_id => @id.to_s).delete
end

#eachObject



153
154
155
156
157
158
# File 'lib/event_store/event_stream.rb', line 153

def each
  events.all.each do |e|
    e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event])
    yield e
  end
end

#empty?Boolean

Returns:

  • (Boolean)


149
150
151
# File 'lib/event_store/event_stream.rb', line 149

def empty?
  events.empty?
end

#event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object



139
140
141
142
143
# File 'lib/event_store/event_stream.rb', line 139

def event_stream_between(start_time, end_time, fully_qualified_names = [])
  query = events.where(occurred_at: start_time..end_time)
  query = query.where(fully_qualified_name: fully_qualified_names) if fully_qualified_names && fully_qualified_names.any?
  query.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e}
end

#eventsObject



55
56
57
58
59
60
61
62
63
64
# File 'lib/event_store/event_stream.rb', line 55

def events
  @events_query ||=
    begin
      query = EventStore.db.from(@aliased_event_table).where(:aggregate_id => @id.to_s)
      query = query.join(@names_table, id: :fully_qualified_name_id) if EventStore.use_names_table?
      query = query.order { events[:id] }.select_all(:events)
      query = query.select_append(:fully_qualified_name) if EventStore.use_names_table?
      query
    end
end

#events_from(event_id, max = nil) ⇒ Object



82
83
84
85
86
87
88
# File 'lib/event_store/event_stream.rb', line 82

def events_from(event_id, max = nil)
  # note: this depends on the events table being aliased to "events" above.
  events.limit(max).where{events[:id] >= event_id.to_i }.all.map do |event|
    event[:serialized_event] = EventStore.unescape_bytea(event[:serialized_event])
    event
  end
end

#fully_qualified_namesObject



51
52
53
# File 'lib/event_store/event_stream.rb', line 51

def fully_qualified_names
  @fully_qualified_name_query ||= EventStore.db.from(@names_table)
end

#insert_table(occurred_at) ⇒ Object



43
44
45
# File 'lib/event_store/event_stream.rb', line 43

def insert_table(occurred_at)
  EventStore.db.from(insert_table_name(occurred_at))
end

#insert_table_name(date) ⇒ Object



47
48
49
# File 'lib/event_store/event_stream.rb', line 47

def insert_table_name(date)
  EventStore.insert_table_name(date)
end

#lastObject



145
146
147
# File 'lib/event_store/event_stream.rb', line 145

def last
  to_a.last
end

#last_event_before(start_time, fully_qualified_names = []) ⇒ Object

Private: returns the last event before start_time for each of the events named

by fully_qualified_names.

Generates queries that look like this:

SELECT events.*, fully_qualified_name
  FROM event_store.thermostat_events "events"

INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id

WHERE events.id IN (SELECT max(events.id) from event_store.thermostat_events "events"
                INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id
                     WHERE occurred_at < '2016-08-08 06:00:00'
                       AND fully_qualified_name = 'faceplate_api.system.core.events.HeatingStageStarted'
                  GROUP BY sub_key);


104
105
106
107
108
109
110
111
112
113
# File 'lib/event_store/event_stream.rb', line 104

def last_event_before(start_time, fully_qualified_names = [])
  timestampz = start_time.strftime("%Y-%m-%d %H:%M:%S%z")

  rows = fully_qualified_names.inject([]) { |memo, name|
    memo + events.where(Sequel.qualify("events", "id") => events.where(fully_qualified_name: name).where { occurred_at < timestampz }
             .select { max(events[:id]) }.unordered.group(:sub_key)).all
  }.sort_by { |r| r[:occurred_at] }

  rows.map {|r| r[:serialized_event] = EventStore.unescape_bytea(r[:serialized_event]); r}
end

#simple_last_event_before(start_time, fully_qualified_names = []) ⇒ Object

Private: returns the last event before start_time for each of the events named

by fully_qualified_names. Doesn't work when events have multiple valid
sub_keys, but is fast when they don't.

Generates queries that look like this:

SELECT events.*, fully_qualified_name
  FROM event_store.thermostat_events "events"

INNER JOIN event_store.fully_qualified_names fqn ON fqn.id = fully_qualified_name_id

   WHERE occurred_at < '2016-08-08 06:00:00'
     AND fully_qualified_name = 'faceplate_api.system.core.events.HeatingStageStarted'
ORDER BY occurred_at DESC LIMIT 1;


128
129
130
131
132
133
134
135
136
137
# File 'lib/event_store/event_stream.rb', line 128

def simple_last_event_before(start_time, fully_qualified_names = [])
  timestampz = start_time.strftime("%Y-%m-%d %H:%M:%S%z")

  rows = fully_qualified_names.inject([]) { |memo, name|
    memo + events.where(fully_qualified_name: name).where{ occurred_at < timestampz }
             .reverse_order(:occurred_at, :id).limit(1).all
  }.sort_by { |r| r[:occurred_at] }

  rows.map {|r| r[:serialized_event] = EventStore.unescape_bytea(r[:serialized_event]); r}
end

#snapshot_eventsObject



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/event_store/event_stream.rb', line 66

def snapshot_events
  last_checkpoint = nil

  if checkpoint_events
    checkpoints = last_event_before(Time.now.utc, checkpoint_events)

    last_checkpoint = checkpoints.first # start at the earliest possible place
  end

  if last_checkpoint
    events.where{ events[:id] >= last_checkpoint[:id].to_i }
  else
    events
  end
end