Class: EventStore::Snapshot

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/event_store/snapshot.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(aggregate) ⇒ Snapshot

Returns a new instance of Snapshot.



9
10
11
12
13
14
# File 'lib/event_store/snapshot.rb', line 9

def initialize aggregate
  @aggregate = aggregate
  @redis = EventStore.redis(aggregate.id)
  @snapshot_table = "#{@aggregate.type}_snapshots_for_#{@aggregate.id}"
  @snapshot_event_id_table = "#{@aggregate.type}_snapshot_event_ids_for_#{@aggregate.id}"
end

Instance Attribute Details

#snapshot_event_id_tableObject (readonly)

Returns the value of attribute snapshot_event_id_table.



7
8
9
# File 'lib/event_store/snapshot.rb', line 7

def snapshot_event_id_table
  @snapshot_event_id_table
end

#snapshot_tableObject (readonly)

Returns the value of attribute snapshot_table.



7
8
9
# File 'lib/event_store/snapshot.rb', line 7

def snapshot_table
  @snapshot_table
end

Instance Method Details

#count(logger = default_logger) ⇒ Object



32
33
34
# File 'lib/event_store/snapshot.rb', line 32

def count(logger = default_logger)
  read_with_rebuild(logger).count
end

#delete_snapshot!Object



67
68
69
# File 'lib/event_store/snapshot.rb', line 67

def delete_snapshot!
  @redis.del [snapshot_table, snapshot_event_id_table]
end

#each(logger = default_logger) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/event_store/snapshot.rb', line 36

def each(logger=default_logger)
  logger.info { "#{self.class.name}#each for #{@aggregate.id}" }
  t = Time.now
  events_hash = read_with_rebuild(logger)
  logger.debug { "#{self.class.name}#auto_rebuild_snapshot took #{Time.now - t} seconds for #{@aggregate.id}" }

  t = Time.now
  result_hash = events_hash.inject([]) do |snapshot, (key, value)|
    snapshot + [serialized_event_from_snapshot_event(key, value)]
  end
  logger.debug { "#{self.class.name} serializing events took #{Time.now - t} seconds" }
  result_hash.sort_by(&:event_id).each { |e| yield e }
end

#event_id(snapshot_key = :current_event_id) ⇒ Object



24
25
26
# File 'lib/event_store/snapshot.rb', line 24

def event_id(snapshot_key =:current_event_id)
  (@redis.hget(snapshot_event_id_table, snapshot_key) || -1).to_i
end

#event_id_for(fully_qualified_name, sub_key = nil) ⇒ Object



28
29
30
# File 'lib/event_store/snapshot.rb', line 28

def event_id_for(fully_qualified_name, sub_key = nil)
  event_id(snapshot_key(fully_qualified_name: fully_qualified_name, sub_key: sub_key))
end

#exists?Boolean

Returns:

  • (Boolean)


16
17
18
# File 'lib/event_store/snapshot.rb', line 16

def exists?
  @redis.exists(snapshot_table)
end

#last_eventObject



20
21
22
# File 'lib/event_store/snapshot.rb', line 20

def last_event
  to_a.last
end

#rebuild_snapshot!(logger = default_logger) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/event_store/snapshot.rb', line 50

def rebuild_snapshot!(logger=default_logger)
  logger.info { "#{self.class.name}#rebuild_snapshot!" }
  t = Time.now
  delete_snapshot!
  logger.debug { "Deleting snapshot took #{Time.now - t} seconds" }
  t = Time.now
  all_events = @aggregate.snapshot_events.all
  logger.debug { "getting #{all_events.count} events" }
  logger.debug { "getting all events took #{Time.now - t} seconds" }
  t = Time.now
  corrected_events = all_events.map{|e| e[:occurred_at] = TimeHacker.translate_occurred_at_from_local_to_gmt(e[:occurred_at]); e}
  logger.debug { "correcting occurred_at on all events took #{Time.now - t} seconds" }
  t = Time.now
  store_snapshot(corrected_events)
  logger.debug { "storing new snapshot took #{Time.now - t} seconds" }
end

#reject!(logger = default_logger) ⇒ Object



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/event_store/snapshot.rb', line 109

def reject!(logger = default_logger)
  events_to_keep = read_with_rebuild(logger)
  event_ids = current_event_id_numbers

  events_to_keep.dup.each { |snapshot_key, snapshot_event|
    serialized_event = serialized_event_from_snapshot_event(snapshot_key, snapshot_event)
    drop_it = yield serialized_event

    if drop_it
      events_to_keep.delete(snapshot_key)
      event_ids.delete(snapshot_key)
    end
  }

  replace_snapshot_tables(event_ids.flatten, events_to_keep.flatten)
end

#store_snapshot(prepared_events, logger = default_logger) ⇒ Object



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/event_store/snapshot.rb', line 71

def store_snapshot(prepared_events, logger=default_logger)
  valid_snapshot_events = []
  valid_snapshot_event_ids = []

  prepared_events.each do |event_hash|
    key = snapshot_key(event_hash)
    current_id = current_event_id_numbers[key].to_i

    logger.debug("Snapshot#store_snapshot: snapshot_key: #{key} prepared id: #{event_hash[:id]}, current id: #{current_id}")
    if event_hash[:id].to_i > current_id
      logger.debug("prepared event is newer, storing")
      valid_snapshot_events    += snapshot_event(event_hash)
      valid_snapshot_event_ids += snapshot_event_id(event_hash)
    end
  end

  logger.debug("valid_snapshot_event_ids: #{valid_snapshot_event_ids.inspect}")
  if valid_snapshot_event_ids.any?
    logger.debug("there are valid_snapshot_event_ids, persisting to redis")
    valid_snapshot_event_ids += [:current_event_id, valid_snapshot_event_ids.last.to_i]

    update_snapshot_tables(valid_snapshot_event_ids, valid_snapshot_events)
  end
end

#update_fqns!(logger = default_logger) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/event_store/snapshot.rb', line 96

def update_fqns!(logger = default_logger)
  logger.debug { "Replacing FQNs in snapshot events" }
  updated_events = replace_fqns_in_snapshot_hash(read_with_rebuild(logger)) { |fqn| yield fqn }

  logger.debug { "Replacing FQNs in snapshot event ids" }
  updated_event_ids = replace_fqns_in_snapshot_hash(current_event_id_numbers)  { |fqn|
    fqn == 'current_event_id' ? fqn : (yield fqn)
  }

  logger.debug "Updating snapshot tables in redis"
  replace_snapshot_tables(updated_event_ids, updated_events)
end