Class: AggregateStreams::Aggregation

Inherits:
Object
  • Object
show all
Includes:
Schema::DataStructure
Defined in:
lib/aggregate_streams/aggregation.rb

Defined Under Namespace

Modules: Transform

Instance Method Summary collapse

Instance Method Details

#processed?(message) ⇒ Boolean

Returns:

  • (Boolean)


24
25
26
27
28
29
30
31
# File 'lib/aggregate_streams/aggregation.rb', line 24

def processed?(message)
  message_category = Messaging::StreamName.get_category(message.stream_name)

  sequence = sequence(message_category)
  return false if sequence.nil?

  sequence >= message.global_position
end

#record_processed(message) ⇒ Object



15
16
17
18
19
20
21
22
# File 'lib/aggregate_streams/aggregation.rb', line 15

def record_processed(message)
  causation_stream_name = message..fetch(:causation_message_stream_name)
  causation_global_position = message..fetch(:causation_message_global_position)

  causation_category = Messaging::StreamName.get_category(causation_stream_name)

  set_sequence(causation_category, causation_global_position)
end

#sequence(category) ⇒ Object



11
12
13
# File 'lib/aggregate_streams/aggregation.rb', line 11

def sequence(category)
  sequences[category]
end

#set_sequence(category, sequence) ⇒ Object



7
8
9
# File 'lib/aggregate_streams/aggregation.rb', line 7

def set_sequence(category, sequence)
  sequences[category] = sequence
end