Class: EventStore::Snapshot
- Inherits:
-
Object
- Object
- EventStore::Snapshot
- Includes:
- Enumerable
- Defined in:
- lib/event_store/snapshot.rb
Instance Attribute Summary collapse
-
#snapshot_event_id_table ⇒ Object
readonly
Returns the value of attribute snapshot_event_id_table.
-
#snapshot_table ⇒ Object
readonly
Returns the value of attribute snapshot_table.
Instance Method Summary collapse
- #count(logger = default_logger) ⇒ Object
- #delete_snapshot! ⇒ Object
- #each(logger = default_logger) ⇒ Object
- #event_id(snapshot_key = :current_event_id) ⇒ Object
- #event_id_for(fully_qualified_name, sub_key = nil) ⇒ Object
- #exists? ⇒ Boolean
-
#initialize(aggregate) ⇒ Snapshot
constructor
A new instance of Snapshot.
- #last_event ⇒ Object
- #rebuild_snapshot!(logger = default_logger) ⇒ Object
- #store_snapshot(prepared_events, logger = default_logger) ⇒ Object
- #update_fqns!(logger = default_logger) ⇒ Object
Constructor Details
#initialize(aggregate) ⇒ Snapshot
Returns a new instance of Snapshot.
9 10 11 12 13 14 |
# File 'lib/event_store/snapshot.rb', line 9 def initialize aggregate @aggregate = aggregate @redis = EventStore.redis @snapshot_table = "#{@aggregate.type}_snapshots_for_#{@aggregate.id}" @snapshot_event_id_table = "#{@aggregate.type}_snapshot_event_ids_for_#{@aggregate.id}" end |
Instance Attribute Details
#snapshot_event_id_table ⇒ Object (readonly)
Returns the value of attribute snapshot_event_id_table.
7 8 9 |
# File 'lib/event_store/snapshot.rb', line 7 def snapshot_event_id_table @snapshot_event_id_table end |
#snapshot_table ⇒ Object (readonly)
Returns the value of attribute snapshot_table.
7 8 9 |
# File 'lib/event_store/snapshot.rb', line 7 def snapshot_table @snapshot_table end |
Instance Method Details
#count(logger = default_logger) ⇒ Object
32 33 34 |
# File 'lib/event_store/snapshot.rb', line 32 def count(logger = default_logger) read_with_rebuild(logger).count end |
#delete_snapshot! ⇒ Object
72 73 74 |
# File 'lib/event_store/snapshot.rb', line 72 def delete_snapshot! EventStore.redis.del [snapshot_table, snapshot_event_id_table] end |
#each(logger = default_logger) ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/event_store/snapshot.rb', line 36 def each(logger=default_logger) logger.info { "#{self.class.name}#each for #{@aggregate.id}" } t = Time.now events_hash = read_with_rebuild(logger) logger.debug { "#{self.class.name}#auto_rebuild_snapshot took #{Time.now - t} seconds for #{@aggregate.id}" } t = Time.now result_hash = events_hash.inject([]) do |snapshot, (key, value)| fully_qualified_name, _ = key.split(EventStore::SNAPSHOT_KEY_DELIMITER) raw_event = value.split(EventStore::SNAPSHOT_DELIMITER) event_id = raw_event.first.to_i serialized_event = EventStore.unescape_bytea(raw_event[1]) occurred_at = Time.parse(raw_event.last) snapshot + [SerializedEvent.new(fully_qualified_name, serialized_event, event_id, occurred_at)] end logger.debug { "#{self.class.name} serializing events took #{Time.now - t} seconds" } result_hash.sort_by(&:event_id).each { |e| yield e } end |
#event_id(snapshot_key = :current_event_id) ⇒ Object
24 25 26 |
# File 'lib/event_store/snapshot.rb', line 24 def event_id(snapshot_key =:current_event_id) (@redis.hget(snapshot_event_id_table, snapshot_key) || -1).to_i end |
#event_id_for(fully_qualified_name, sub_key = nil) ⇒ Object
28 29 30 |
# File 'lib/event_store/snapshot.rb', line 28 def event_id_for(fully_qualified_name, sub_key = nil) event_id(snapshot_key(fully_qualified_name: fully_qualified_name, sub_key: sub_key)) end |
#exists? ⇒ Boolean
16 17 18 |
# File 'lib/event_store/snapshot.rb', line 16 def exists? @redis.exists(snapshot_table) end |
#last_event ⇒ Object
20 21 22 |
# File 'lib/event_store/snapshot.rb', line 20 def last_event to_a.last end |
#rebuild_snapshot!(logger = default_logger) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/event_store/snapshot.rb', line 55 def rebuild_snapshot!(logger=default_logger) logger.info { "#{self.class.name}#rebuild_snapshot!" } t = Time.now delete_snapshot! logger.debug { "Deleting snapshot took #{Time.now - t} seconds" } t = Time.now all_events = @aggregate.snapshot_events.all logger.debug { "getting #{all_events.count} events" } logger.debug { "getting all events took #{Time.now - t} seconds" } t = Time.now corrected_events = all_events.map{|e| e[:occurred_at] = TimeHacker.translate_occurred_at_from_local_to_gmt(e[:occurred_at]); e} logger.debug { "correcting occurred_at on all events took #{Time.now - t} seconds" } t = Time.now store_snapshot(corrected_events) logger.debug { "storing new snapshot took #{Time.now - t} seconds" } end |
#store_snapshot(prepared_events, logger = default_logger) ⇒ Object
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/event_store/snapshot.rb', line 76 def store_snapshot(prepared_events, logger=default_logger) valid_snapshot_events = [] valid_snapshot_event_ids = [] prepared_events.each do |event_hash| key = snapshot_key(event_hash) current_id = current_event_id_numbers[key].to_i logger.debug("Snapshot#store_snapshot: snapshot_key: #{key} prepared id: #{event_hash[:id]}, current id: #{current_id}") if event_hash[:id].to_i > current_id logger.debug("prepared event is newer, storing") valid_snapshot_events += snapshot_event(event_hash) valid_snapshot_event_ids += snapshot_event_id(event_hash) end end logger.debug("valid_snapshot_event_ids: #{valid_snapshot_event_ids.inspect}") if valid_snapshot_event_ids.any? logger.debug("there are valid_snapshot_event_ids, persisting to redis") valid_snapshot_event_ids += [:current_event_id, valid_snapshot_event_ids.last.to_i] update_snapshot_tables(valid_snapshot_event_ids, valid_snapshot_events) end end |
#update_fqns!(logger = default_logger) ⇒ Object
101 102 103 104 105 106 107 108 |
# File 'lib/event_store/snapshot.rb', line 101 def update_fqns!(logger = default_logger) updated_events = replace_fqns_in_snapshot_hash(read_with_rebuild(logger)) { |fqn| yield fqn } updated_event_ids = replace_fqns_in_snapshot_hash(current_event_id_numbers) { |fqn| fqn == 'current_event_id' ? fqn : (yield fqn) } update_snapshot_tables(updated_event_ids, updated_events, replace: true) end |