Class: AggregateStreams::PositionStore

Inherits:
Object
  • Object
show all
Includes:
Consumer::PositionStore, Initializer, Log::Dependency
Defined in:
lib/aggregate_streams/position_store.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(input_category, output_category, session: nil) ⇒ Object



11
12
13
14
15
# File 'lib/aggregate_streams/position_store.rb', line 11

def self.build(input_category, output_category, session: nil)
  instance = new(input_category, output_category)
  MessageStore::Postgres::Session.configure(instance, session: session)
  instance
end

.get_sql_commandObject



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/aggregate_streams/position_store.rb', line 39

def self.get_sql_command
  %{
    SELECT
      metadata->>'causationMessageGlobalPosition' AS position
    FROM messages
    WHERE
      category(stream_name) = $1 AND
      category(metadata->>'causationMessageStreamName') = $2
    ORDER BY global_position DESC LIMIT 1
  }
end

Instance Method Details

#getObject



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/aggregate_streams/position_store.rb', line 17

def get
  logger.trace { "Get position (Output Category: #{output_category.inspect}, Input Category: #{input_category.inspect})" }

  sql_command = self.class.get_sql_command

  parameter_values = [output_category, input_category]

  result = session.execute(sql_command, parameter_values)

  if result.ntuples.zero?
    position = nil
  else
    record = result[0]

    position = record['position'].to_i + 1
  end

  logger.info { "Get position done (Position: #{position || '(none)'}, Output Category: #{output_category.inspect}, Input Category: #{input_category.inspect})" }

  position
end