Class: Eventus::Stream
- Inherits:
-
Object
- Object
- Eventus::Stream
- Defined in:
- lib/eventus/stream.rb
Instance Attribute Summary collapse
-
#committed_events ⇒ Object
readonly
Returns the value of attribute committed_events.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#uncommitted_events ⇒ Object
readonly
Returns the value of attribute uncommitted_events.
Instance Method Summary collapse
- #add(name, body = {}) ⇒ Object
- #commit ⇒ Object
-
#initialize(id, persistence, dispatcher) ⇒ Stream
constructor
A new instance of Stream.
- #version ⇒ Object
Constructor Details
#initialize(id, persistence, dispatcher) ⇒ Stream
Returns a new instance of Stream.
8 9 10 11 12 13 14 15 |
# File 'lib/eventus/stream.rb', line 8 def initialize(id, persistence, dispatcher) @id = id @persistence = persistence @committed_events = [] @uncommitted_events = [] @dispatcher = dispatcher load_events @persistence.load(id) end |
Instance Attribute Details
#committed_events ⇒ Object (readonly)
Returns the value of attribute committed_events.
6 7 8 |
# File 'lib/eventus/stream.rb', line 6 def committed_events @committed_events end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
6 7 8 |
# File 'lib/eventus/stream.rb', line 6 def id @id end |
#uncommitted_events ⇒ Object (readonly)
Returns the value of attribute uncommitted_events.
6 7 8 |
# File 'lib/eventus/stream.rb', line 6 def uncommitted_events @uncommitted_events end |
Instance Method Details
#add(name, body = {}) ⇒ Object
17 18 19 |
# File 'lib/eventus/stream.rb', line 17 def add(name, body={}) @uncommitted_events << {'name' => name, 'body' => body} end |
#commit ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/eventus/stream.rb', line 21 def commit time = Time.now.utc.iso8601 @uncommitted_events.each.with_index(version) do |e, i| e['time'] = time e['sid'] = @id e['sequence'] = i end Eventus::logger.debug "Committing #{@uncommitted_events.length} events to #{@id}" return if @uncommitted_events.empty? payload = @persistence.commit @uncommitted_events load_events @uncommitted_events @dispatcher.dispatch(payload) if @dispatcher @uncommitted_events.clear rescue ConcurrencyError => e Eventus.logger.info "ConcurrencyError, loading new events for: #{id}" load_events @persistence.load(id, version) raise e end |
#version ⇒ Object
40 41 42 |
# File 'lib/eventus/stream.rb', line 40 def version @committed_events.length end |