Class: AggregateStreams::PositionStore
- Inherits:
-
Object
- Object
- AggregateStreams::PositionStore
- Includes:
- Consumer::PositionStore, Initializer, Log::Dependency
- Defined in:
- lib/aggregate_streams/position_store.rb
Class Method Summary collapse
Instance Method Summary collapse
Class Method Details
.build(input_category, output_category, session: nil) ⇒ Object
11 12 13 14 15 |
# File 'lib/aggregate_streams/position_store.rb', line 11 def self.build(input_category, output_category, session: nil) instance = new(input_category, output_category) MessageStore::Postgres::Session.configure(instance, session: session) instance end |
.get_sql_command ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/aggregate_streams/position_store.rb', line 39 def self.get_sql_command %{ SELECT metadata->>'causationMessageGlobalPosition' AS position FROM messages WHERE category(stream_name) = $1 AND category(metadata->>'causationMessageStreamName') = $2 ORDER BY global_position DESC LIMIT 1 } end |
Instance Method Details
#get ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/aggregate_streams/position_store.rb', line 17 def get logger.trace { "Get position (Output Category: #{output_category.inspect}, Input Category: #{input_category.inspect})" } sql_command = self.class.get_sql_command parameter_values = [output_category, input_category] result = session.execute(sql_command, parameter_values) if result.ntuples.zero? position = nil else record = result[0] position = record['position'].to_i + 1 end logger.info { "Get position done (Position: #{position || '(none)'}, Output Category: #{output_category.inspect}, Input Category: #{input_category.inspect})" } position end |