Class: AggregateStreams::Aggregation
- Inherits:
-
Object
- Object
- AggregateStreams::Aggregation
- Includes:
- Schema::DataStructure
- Defined in:
- lib/aggregate_streams/aggregation.rb
Defined Under Namespace
Modules: Transform
Instance Method Summary collapse
- #processed?(message) ⇒ Boolean
- #record_processed(message) ⇒ Object
- #sequence(category) ⇒ Object
- #set_sequence(category, sequence) ⇒ Object
Instance Method Details
#processed?(message) ⇒ Boolean
24 25 26 27 28 29 30 31 |
# File 'lib/aggregate_streams/aggregation.rb', line 24 def processed?() = Messaging::StreamName.get_category(.stream_name) sequence = sequence() return false if sequence.nil? sequence >= .global_position end |
#record_processed(message) ⇒ Object
15 16 17 18 19 20 21 22 |
# File 'lib/aggregate_streams/aggregation.rb', line 15 def record_processed() causation_stream_name = ..fetch(:causation_message_stream_name) causation_global_position = ..fetch(:causation_message_global_position) causation_category = Messaging::StreamName.get_category(causation_stream_name) set_sequence(causation_category, causation_global_position) end |
#sequence(category) ⇒ Object
11 12 13 |
# File 'lib/aggregate_streams/aggregation.rb', line 11 def sequence(category) sequences[category] end |
#set_sequence(category, sequence) ⇒ Object
7 8 9 |
# File 'lib/aggregate_streams/aggregation.rb', line 7 def set_sequence(category, sequence) sequences[category] = sequence end |