Class: EventStore::EventStream

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/event_store/event_stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(aggregate) ⇒ EventStream

Returns a new instance of EventStream.



7
8
9
10
11
12
13
14
15
# File 'lib/event_store/event_stream.rb', line 7

def initialize aggregate
  @aggregate = aggregate
  @id = @aggregate.id
  @checkpoint_event = aggregate.checkpoint_event
  @event_table_alias = "events"
  @event_table = "#{EventStore.schema}__#{EventStore.table_name}".to_sym
  @aliased_event_table = "#{event_table}___#{@event_table_alias}".to_sym
  @names_table = EventStore.fully_qualified_names_table
end

Instance Attribute Details

#checkpoint_eventObject (readonly)

Returns the value of attribute checkpoint_event.



5
6
7
# File 'lib/event_store/event_stream.rb', line 5

def checkpoint_event
  @checkpoint_event
end

#event_tableObject (readonly)

Returns the value of attribute event_table.



5
6
7
# File 'lib/event_store/event_stream.rb', line 5

def event_table
  @event_table
end

Instance Method Details

#append(raw_events, logger) {|prepared_events| ... } ⇒ Object

Yields:

  • (prepared_events)


17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/event_store/event_stream.rb', line 17

def append(raw_events, logger)
  prepared_events = raw_events.map do |raw_event|
    event = prepare_event(raw_event)
    ensure_all_attributes_have_values!(event)
    event
  end

  prepared_events.each do |event|
    event_hash = event.dup.reject! { |k,v| k == :fully_qualified_name }
    event_table = insert_table(Time.now)

    begin
      id = event_table.insert(event_hash)
    rescue Sequel::NotNullConstraintViolation
      fully_qualified_names.insert(fully_qualified_name: event[:fully_qualified_name])
      id = event_table.insert(event_hash)
    end

    logger.debug("EventStream#append, setting id #{id} for #{event_hash.inspect}")

    event[:id] = id
  end

  yield(prepared_events) if block_given?
end

#delete_events!Object



116
117
118
# File 'lib/event_store/event_stream.rb', line 116

def delete_events!
  EventStore.db.from(@event_table).where(:aggregate_id => @id.to_s).delete
end

#eachObject



109
110
111
112
113
114
# File 'lib/event_store/event_stream.rb', line 109

def each
  events.all.each do |e|
    e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event])
    yield e
  end
end

#empty?Boolean

Returns:

  • (Boolean)


105
106
107
# File 'lib/event_store/event_stream.rb', line 105

def empty?
  events.empty?
end

#event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object



95
96
97
98
99
# File 'lib/event_store/event_stream.rb', line 95

def event_stream_between(start_time, end_time, fully_qualified_names = [])
  query = events.where(occurred_at: start_time..end_time)
  query = query.where(fully_qualified_name: fully_qualified_names) if fully_qualified_names && fully_qualified_names.any?
  query.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e}
end

#eventsObject



55
56
57
58
59
60
61
62
63
64
# File 'lib/event_store/event_stream.rb', line 55

def events
  @events_query ||=
    begin
      query = EventStore.db.from(@aliased_event_table).where(:aggregate_id => @id.to_s)
      query = query.join(@names_table, id: :fully_qualified_name_id) if EventStore.use_names_table?
      query = query.order("#{@event_table_alias}__id".to_sym).select_all(:events)
      query = query.select_append(:fully_qualified_name) if EventStore.use_names_table?
      query
    end
end

#events_from(event_id, max = nil) ⇒ Object



76
77
78
79
80
81
82
# File 'lib/event_store/event_stream.rb', line 76

def events_from(event_id, max = nil)
  # note: this depends on the events table being aliased to "events" above.
  events.limit(max).where{events__id >= event_id.to_i }.all.map do |event|
    event[:serialized_event] = EventStore.unescape_bytea(event[:serialized_event])
    event
  end
end

#fully_qualified_namesObject



51
52
53
# File 'lib/event_store/event_stream.rb', line 51

def fully_qualified_names
  @fully_qualified_name_query ||= EventStore.db.from(@names_table)
end

#insert_table(occurred_at) ⇒ Object



43
44
45
# File 'lib/event_store/event_stream.rb', line 43

def insert_table(occurred_at)
  EventStore.db.from(insert_table_name(occurred_at))
end

#insert_table_name(date) ⇒ Object



47
48
49
# File 'lib/event_store/event_stream.rb', line 47

def insert_table_name(date)
  EventStore.insert_table_name(date)
end

#lastObject



101
102
103
# File 'lib/event_store/event_stream.rb', line 101

def last
  to_a.last
end

#last_event_before(start_time, fully_qualified_names = []) ⇒ Object



84
85
86
87
88
89
90
91
92
93
# File 'lib/event_store/event_stream.rb', line 84

def last_event_before(start_time, fully_qualified_names = [])
  timestampz = start_time.strftime("%Y-%m-%d %H:%M:%S%z")

  rows = fully_qualified_names.inject([]) { |memo, name|
    memo + events.where(fully_qualified_name: name).where{ occurred_at < timestampz }
             .reverse_order(:occurred_at, :id).limit(1).all
  }.sort_by { |r| r[:occurred_at] }

  rows.map {|r| r[:serialized_event] = EventStore.unescape_bytea(r[:serialized_event]); r}
end

#snapshot_eventsObject



66
67
68
69
70
71
72
73
74
# File 'lib/event_store/event_stream.rb', line 66

def snapshot_events
  last_checkpoint = last_event_before(Time.now.utc, [checkpoint_event]).first if checkpoint_event

  if last_checkpoint
    events.where{ events__id >= last_checkpoint[:id].to_i }
  else
    events
  end
end