Class: Eventus::Persistence::InMemory
- Inherits:
-
Object
- Object
- Eventus::Persistence::InMemory
- Defined in:
- lib/eventus/persistence/in_memory.rb
Instance Method Summary collapse
- #build_key(id, index) ⇒ Object
- #commit(events) ⇒ Object
-
#initialize(options = {}) ⇒ InMemory
constructor
A new instance of InMemory.
- #load(id, min = nil) ⇒ Object
- #load_undispatched ⇒ Object
- #mark_dispatched(stream_id, sequence) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ InMemory
Returns a new instance of InMemory.
5 6 7 8 9 |
# File 'lib/eventus/persistence/in_memory.rb', line 5 def initialize( = {}) @store = {} @serializer = .fetch(:serializer) { Eventus::Serializers::Marshal } @mutex = Mutex.new end |
Instance Method Details
#build_key(id, index) ⇒ Object
54 55 56 |
# File 'lib/eventus/persistence/in_memory.rb', line 54 def build_key(id, index) id + ("_%07d" % index) end |
#commit(events) ⇒ Object
11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/eventus/persistence/in_memory.rb', line 11 def commit(events) @mutex.synchronize do pending = {} events.each do |event| key = build_key(event['sid'], event['sequence']) raise Eventus::ConcurrencyError if @store.include? key value = @serializer.serialize(event) pending[key] = value end @store.merge! pending end end |
#load(id, min = nil) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/eventus/persistence/in_memory.rb', line 24 def load(id, min=nil) @mutex.synchronize do keys = @store.keys.select { |k| k.start_with? id }.sort if min min_key = build_key(id, min) keys = keys.drop_while { |k| k != min_key } end keys.map { |k| @serializer.deserialize(@store[k]) } end end |
#load_undispatched ⇒ Object
37 38 39 40 41 42 |
# File 'lib/eventus/persistence/in_memory.rb', line 37 def load_undispatched @mutex.synchronize do @store.map { |k,v| @serializer.deserialize(v) } .reject { |e| e['dispatched'] || e[:dispatched] } end end |
#mark_dispatched(stream_id, sequence) ⇒ Object
44 45 46 47 48 49 50 51 52 |
# File 'lib/eventus/persistence/in_memory.rb', line 44 def mark_dispatched(stream_id, sequence) key = build_key(stream_id, sequence) value = @store[key] return unless value event = @serializer.deserialize(value) event['dispatched'] = true obj = @serializer.serialize(event) @store[key] = obj end |