Class: EventStore::EventStream
- Inherits:
-
Object
- Object
- EventStore::EventStream
- Includes:
- Enumerable
- Defined in:
- lib/event_store/event_stream.rb
Instance Attribute Summary collapse
-
#checkpoint_event ⇒ Object
readonly
Returns the value of attribute checkpoint_event.
-
#event_table ⇒ Object
readonly
Returns the value of attribute event_table.
Instance Method Summary collapse
- #append(raw_events, logger) {|prepared_events| ... } ⇒ Object
- #delete_events! ⇒ Object
- #each ⇒ Object
- #empty? ⇒ Boolean
- #event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object
- #events ⇒ Object
- #events_from(event_id, max = nil) ⇒ Object
- #fully_qualified_names ⇒ Object
-
#initialize(aggregate) ⇒ EventStream
constructor
A new instance of EventStream.
- #insert_table(occurred_at) ⇒ Object
- #insert_table_name(date) ⇒ Object
- #last ⇒ Object
- #last_event_before(start_time, fully_qualified_names = []) ⇒ Object
- #snapshot_events ⇒ Object
Constructor Details
#initialize(aggregate) ⇒ EventStream
Returns a new instance of EventStream.
7 8 9 10 11 12 13 14 15 |
# File 'lib/event_store/event_stream.rb', line 7 def initialize aggregate @aggregate = aggregate @id = @aggregate.id @checkpoint_event = aggregate.checkpoint_event @event_table_alias = "events" @event_table = "#{EventStore.schema}__#{EventStore.table_name}".to_sym @aliased_event_table = "#{event_table}___#{@event_table_alias}".to_sym @names_table = EventStore.fully_qualified_names_table end |
Instance Attribute Details
#checkpoint_event ⇒ Object (readonly)
Returns the value of attribute checkpoint_event.
5 6 7 |
# File 'lib/event_store/event_stream.rb', line 5 def checkpoint_event @checkpoint_event end |
#event_table ⇒ Object (readonly)
Returns the value of attribute event_table.
5 6 7 |
# File 'lib/event_store/event_stream.rb', line 5 def event_table @event_table end |
Instance Method Details
#append(raw_events, logger) {|prepared_events| ... } ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/event_store/event_stream.rb', line 17 def append(raw_events, logger) prepared_events = raw_events.map do |raw_event| event = prepare_event(raw_event) ensure_all_attributes_have_values!(event) event end prepared_events.each do |event| event_hash = event.dup.reject! { |k,v| k == :fully_qualified_name } event_table = insert_table(Time.now) begin id = event_table.insert(event_hash) rescue Sequel::NotNullConstraintViolation fully_qualified_names.insert(fully_qualified_name: event[:fully_qualified_name]) id = event_table.insert(event_hash) end logger.debug("EventStream#append, setting id #{id} for #{event_hash.inspect}") event[:id] = id end yield(prepared_events) if block_given? end |
#delete_events! ⇒ Object
116 117 118 |
# File 'lib/event_store/event_stream.rb', line 116 def delete_events! EventStore.db.from(@event_table).where(:aggregate_id => @id.to_s).delete end |
#each ⇒ Object
109 110 111 112 113 114 |
# File 'lib/event_store/event_stream.rb', line 109 def each events.all.each do |e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]) yield e end end |
#empty? ⇒ Boolean
105 106 107 |
# File 'lib/event_store/event_stream.rb', line 105 def empty? events.empty? end |
#event_stream_between(start_time, end_time, fully_qualified_names = []) ⇒ Object
95 96 97 98 99 |
# File 'lib/event_store/event_stream.rb', line 95 def event_stream_between(start_time, end_time, fully_qualified_names = []) query = events.where(occurred_at: start_time..end_time) query = query.where(fully_qualified_name: fully_qualified_names) if fully_qualified_names && fully_qualified_names.any? query.all.map {|e| e[:serialized_event] = EventStore.unescape_bytea(e[:serialized_event]); e} end |
#events ⇒ Object
55 56 57 58 59 60 61 62 63 64 |
# File 'lib/event_store/event_stream.rb', line 55 def events @events_query ||= begin query = EventStore.db.from(@aliased_event_table).where(:aggregate_id => @id.to_s) query = query.join(@names_table, id: :fully_qualified_name_id) if EventStore.use_names_table? query = query.order("#{@event_table_alias}__id".to_sym).select_all(:events) query = query.select_append(:fully_qualified_name) if EventStore.use_names_table? query end end |
#events_from(event_id, max = nil) ⇒ Object
76 77 78 79 80 81 82 |
# File 'lib/event_store/event_stream.rb', line 76 def events_from(event_id, max = nil) # note: this depends on the events table being aliased to "events" above. events.limit(max).where{events__id >= event_id.to_i }.all.map do |event| event[:serialized_event] = EventStore.unescape_bytea(event[:serialized_event]) event end end |
#fully_qualified_names ⇒ Object
51 52 53 |
# File 'lib/event_store/event_stream.rb', line 51 def fully_qualified_names @fully_qualified_name_query ||= EventStore.db.from(@names_table) end |
#insert_table(occurred_at) ⇒ Object
43 44 45 |
# File 'lib/event_store/event_stream.rb', line 43 def insert_table(occurred_at) EventStore.db.from(insert_table_name(occurred_at)) end |
#insert_table_name(date) ⇒ Object
47 48 49 |
# File 'lib/event_store/event_stream.rb', line 47 def insert_table_name(date) EventStore.insert_table_name(date) end |
#last ⇒ Object
101 102 103 |
# File 'lib/event_store/event_stream.rb', line 101 def last to_a.last end |
#last_event_before(start_time, fully_qualified_names = []) ⇒ Object
84 85 86 87 88 89 90 91 92 93 |
# File 'lib/event_store/event_stream.rb', line 84 def last_event_before(start_time, fully_qualified_names = []) = start_time.strftime("%Y-%m-%d %H:%M:%S%z") rows = fully_qualified_names.inject([]) { |memo, name| memo + events.where(fully_qualified_name: name).where{ occurred_at < } .reverse_order(:occurred_at, :id).limit(1).all }.sort_by { |r| r[:occurred_at] } rows.map {|r| r[:serialized_event] = EventStore.unescape_bytea(r[:serialized_event]); r} end |
#snapshot_events ⇒ Object
66 67 68 69 70 71 72 73 74 |
# File 'lib/event_store/event_stream.rb', line 66 def snapshot_events last_checkpoint = last_event_before(Time.now.utc, [checkpoint_event]).first if checkpoint_event if last_checkpoint events.where{ events__id >= last_checkpoint[:id].to_i } else events end end |