Class: Akasha::Storage::MemoryEventStore::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/akasha/storage/memory_event_store/stream.rb

Overview

Memory-based event stream.

Instance Method Summary collapse

Constructor Details

#initialize(&before_write) ⇒ Stream

Creates a new event stream. Accepts an optional block, allowing for filtering new events and triggering side-effects, before new events are appended to the stream,



9
10
11
12
13
14
# File 'lib/akasha/storage/memory_event_store/stream.rb', line 9

def initialize(&before_write)
  @before_write = before_write || identity
  @events = []
  @metadata = {}
  @monitor = Monitor.new
end

Instance Method Details

#metadataObject



37
38
39
40
41
# File 'lib/akasha/storage/memory_event_store/stream.rb', line 37

def 
  @monitor.synchronize do
    @metadata
  end
end

#metadata=(metadata) ⇒ Object



43
44
45
46
47
# File 'lib/akasha/storage/memory_event_store/stream.rb', line 43

def metadata=()
  @monitor.synchronize do
    @metadata = 
  end
end

#read_events(start, page_size, **_options, &block) ⇒ Object

Reads events from the stream starting from ‘start` inclusive. If block given, reads all events from the start in pages of `page_size`. If block not given, reads `page_size` events from the start.



27
28
29
30
31
32
33
34
35
# File 'lib/akasha/storage/memory_event_store/stream.rb', line 27

def read_events(start, page_size, **_options, &block)
  @monitor.synchronize do
    if block_given?
      @events.drop(start).each_slice(page_size, &block)
    else
      @events[start..start + page_size]
    end
  end
end

#write_events(events, revision: nil) ⇒ Object

Appends events to the stream.



17
18
19
20
21
22
# File 'lib/akasha/storage/memory_event_store/stream.rb', line 17

def write_events(events, revision: nil)
  @monitor.synchronize do
    check_revision!(revision)
    @events += to_recorded_events(@events.size, @before_write.call(events))
  end
end