Class: EventStore::Snapshot

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/event_store/snapshot.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(aggregate) ⇒ Snapshot

Returns a new instance of Snapshot.


9
10
11
12
13
14
# File 'lib/event_store/snapshot.rb', line 9

def initialize aggregate
  @aggregate = aggregate
  @redis = EventStore.redis
  @snapshot_table = "#{@aggregate.type}_snapshots_for_#{@aggregate.id}"
  @snapshot_event_id_table = "#{@aggregate.type}_snapshot_event_ids_for_#{@aggregate.id}"
end

Instance Attribute Details

#snapshot_event_id_tableObject (readonly)

Returns the value of attribute snapshot_event_id_table.


7
8
9
# File 'lib/event_store/snapshot.rb', line 7

def snapshot_event_id_table
  @snapshot_event_id_table
end

#snapshot_tableObject (readonly)

Returns the value of attribute snapshot_table.


7
8
9
# File 'lib/event_store/snapshot.rb', line 7

def snapshot_table
  @snapshot_table
end

Instance Method Details

#count(logger = default_logger) ⇒ Object


32
33
34
# File 'lib/event_store/snapshot.rb', line 32

def count(logger = default_logger)
  read_with_rebuild(logger).count
end

#delete_snapshot!Object


72
73
74
# File 'lib/event_store/snapshot.rb', line 72

def delete_snapshot!
  EventStore.redis.del [snapshot_table, snapshot_event_id_table]
end

#each(logger = default_logger) ⇒ Object


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/event_store/snapshot.rb', line 36

def each(logger=default_logger)
  logger.info { "#{self.class.name}#each for #{@aggregate.id}" }
  t = Time.now
  events_hash = read_with_rebuild(logger)
  logger.debug { "#{self.class.name}#auto_rebuild_snapshot took #{Time.now - t} seconds for #{@aggregate.id}" }

  t = Time.now
  result_hash = events_hash.inject([]) do |snapshot, (key, value)|
    fully_qualified_name, _ = key.split(EventStore::SNAPSHOT_KEY_DELIMITER)
    raw_event               = value.split(EventStore::SNAPSHOT_DELIMITER)
    event_id                = raw_event.first.to_i
    serialized_event        = EventStore.unescape_bytea(raw_event[1])
    occurred_at             = Time.parse(raw_event.last)
    snapshot + [SerializedEvent.new(fully_qualified_name, serialized_event, event_id, occurred_at)]
  end
  logger.debug { "#{self.class.name} serializing events took #{Time.now - t} seconds" }
  result_hash.sort_by(&:event_id).each { |e| yield e }
end

#event_id(snapshot_key = :current_event_id) ⇒ Object


24
25
26
# File 'lib/event_store/snapshot.rb', line 24

def event_id(snapshot_key =:current_event_id)
  (@redis.hget(snapshot_event_id_table, snapshot_key) || -1).to_i
end

#event_id_for(fully_qualified_name, sub_key = nil) ⇒ Object


28
29
30
# File 'lib/event_store/snapshot.rb', line 28

def event_id_for(fully_qualified_name, sub_key = nil)
  event_id(snapshot_key(fully_qualified_name: fully_qualified_name, sub_key: sub_key))
end

#exists?Boolean

Returns:

  • (Boolean)

16
17
18
# File 'lib/event_store/snapshot.rb', line 16

def exists?
  @redis.exists(snapshot_table)
end

#last_eventObject


20
21
22
# File 'lib/event_store/snapshot.rb', line 20

def last_event
  to_a.last
end

#rebuild_snapshot!(logger = default_logger) ⇒ Object


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/event_store/snapshot.rb', line 55

def rebuild_snapshot!(logger=default_logger)
  logger.info { "#{self.class.name}#rebuild_snapshot!" }
  t = Time.now
  delete_snapshot!
  logger.debug { "Deleting snapshot took #{Time.now - t} seconds" }
  t = Time.now
  all_events = @aggregate.snapshot_events.all
  logger.debug { "getting #{all_events.count} events" }
  logger.debug { "getting all events took #{Time.now - t} seconds" }
  t = Time.now
  corrected_events = all_events.map{|e| e[:occurred_at] = TimeHacker.translate_occurred_at_from_local_to_gmt(e[:occurred_at]); e}
  logger.debug { "correcting occurred_at on all events took #{Time.now - t} seconds" }
  t = Time.now
  store_snapshot(corrected_events)
  logger.debug { "storing new snapshot took #{Time.now - t} seconds" }
end

#store_snapshot(prepared_events, logger = default_logger) ⇒ Object


76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/event_store/snapshot.rb', line 76

def store_snapshot(prepared_events, logger=default_logger)
  valid_snapshot_events = []
  valid_snapshot_event_ids = []

  prepared_events.each do |event_hash|
    key = snapshot_key(event_hash)
    current_id = current_event_id_numbers[key].to_i

    logger.debug("Snapshot#store_snapshot: snapshot_key: #{key} prepared id: #{event_hash[:id]}, current id: #{current_id}")
    if event_hash[:id].to_i > current_id
      logger.debug("prepared event is newer, storing")
      valid_snapshot_events    += snapshot_event(event_hash)
      valid_snapshot_event_ids += snapshot_event_id(event_hash)
    end
  end

  logger.debug("valid_snapshot_event_ids: #{valid_snapshot_event_ids.inspect}")
  if valid_snapshot_event_ids.any?
    logger.debug("there are valid_snapshot_event_ids, persisting to redis")
    valid_snapshot_event_ids += [:current_event_id, valid_snapshot_event_ids.last.to_i]

    update_snapshot_tables(valid_snapshot_event_ids, valid_snapshot_events)
  end
end

#update_fqns!(logger = default_logger) ⇒ Object


101
102
103
104
105
106
107
108
# File 'lib/event_store/snapshot.rb', line 101

def update_fqns!(logger = default_logger)
  updated_events    = replace_fqns_in_snapshot_hash(read_with_rebuild(logger)) { |fqn| yield fqn }
  updated_event_ids = replace_fqns_in_snapshot_hash(current_event_id_numbers)  { |fqn|
    fqn == 'current_event_id' ? fqn : (yield fqn)
  }

  update_snapshot_tables(updated_event_ids, updated_events, replace: true)
end