Class: EventSourcery::Postgres::EventStore
- Inherits:
-
Object
- Object
- EventSourcery::Postgres::EventStore
- Includes:
- EventStore::EachByRange
- Defined in:
- lib/event_sourcery/postgres/event_store.rb
Instance Method Summary collapse
-
#get_events_for_aggregate_id(aggregate_id) ⇒ Array
Get the events for a given aggregate id.
-
#get_next_from(id, event_types: nil, limit: 1000) ⇒ Array
Get the next set of events from the given event id.
-
#initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder, on_events_recorded: EventSourcery::Postgres.config.on_events_recorded) ⇒ EventStore
constructor
A new instance of EventStore.
-
#latest_event_id(event_types: nil) ⇒ Object
Get last event id for a given event types.
-
#sink(event_or_events, expected_version: nil) ⇒ Object
Like water flowing into a sink eventually it will go down the drain into the goodness of the plumbing system.
-
#subscribe(from_id:, event_types: nil, after_listen: nil, subscription_master:, &block) ⇒ Object
Subscribe to events.
Constructor Details
#initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder, on_events_recorded: EventSourcery::Postgres.config.on_events_recorded) ⇒ EventStore
Returns a new instance of EventStore.
6 7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 6 def initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder, on_events_recorded: EventSourcery::Postgres.config.on_events_recorded) @db_connection = db_connection @events_table_name = events_table_name @write_events_function_name = write_events_function_name @lock_table = lock_table @event_builder = event_builder @on_events_recorded = on_events_recorded end |
Instance Method Details
#get_events_for_aggregate_id(aggregate_id) ⇒ Array
Get the events for a given aggregate id.
87 88 89 90 91 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 87 def get_events_for_aggregate_id(aggregate_id) events_table.where(aggregate_id: aggregate_id.to_str).order(:version).map do |event_hash| build_event(event_hash) end end |
#get_next_from(id, event_types: nil, limit: 1000) ⇒ Array
Get the next set of events from the given event id. You can specify event types and a limit. Default limit is 1000 and the default event types will be all.
57 58 59 60 61 62 63 64 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 57 def get_next_from(id, event_types: nil, limit: 1000) query = events_table. order(:id). where(Sequel.lit('id >= ?', id)). limit(limit) query = query.where(type: event_types) if event_types query.map { |event_row| build_event(event_row) } end |
#latest_event_id(event_types: nil) ⇒ Object
Get last event id for a given event types.
71 72 73 74 75 76 77 78 79 80 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 71 def latest_event_id(event_types: nil) latest_event = events_table latest_event = latest_event.where(type: event_types) if event_types latest_event = latest_event.order(:id).last if latest_event latest_event[:id] else 0 end end |
#sink(event_or_events, expected_version: nil) ⇒ Object
Like water flowing into a sink eventually it will go down the drain into the goodness of the plumbing system. So to will the given events you put in this ‘sink’. Except the plumbing system is the data base events table. This can raise db connection errors.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 31 def sink(event_or_events, expected_version: nil) events = Array(event_or_events) aggregate_ids = events.map(&:aggregate_id).uniq raise AtomicWriteToMultipleAggregatesNotSupported unless aggregate_ids.count == 1 sql = write_events_sql(aggregate_ids.first, events, expected_version) @db_connection.run(sql) log_events_saved(events) on_events_recorded.call(events) true rescue Sequel::DatabaseError => e if e. =~ /Concurrency conflict/ raise ConcurrencyError, "expected version was not #{expected_version}. Error: #{e.}" else raise end end |
#subscribe(from_id:, event_types: nil, after_listen: nil, subscription_master:, &block) ⇒ Object
Subscribe to events.
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/event_sourcery/postgres/event_store.rb', line 99 def subscribe(from_id:, event_types: nil, after_listen: nil, subscription_master:, &block) poll_waiter = OptimisedEventPollWaiter.new(db_connection: @db_connection, after_listen: after_listen) args = { poll_waiter: poll_waiter, event_store: self, from_event_id: from_id, event_types: event_types, events_table_name: @events_table_name, subscription_master: subscription_master, on_new_events: block } EventSourcery::EventStore::Subscription.new(**args).tap(&:start) end |