Class: EventSourcery::Postgres::EventStore

Inherits:
Object
  • Object
show all
Includes:
EventStore::EachByRange
Defined in:
lib/event_sourcery/postgres/event_store.rb

Instance Method Summary collapse

Constructor Details

#initialize(db_connection, events_table_name: EventSourcery::Postgres.config.events_table_name, lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth, write_events_function_name: EventSourcery::Postgres.config.write_events_function_name, event_builder: EventSourcery.config.event_builder, on_events_recorded: EventSourcery::Postgres.config.on_events_recorded) ⇒ EventStore

Returns a new instance of EventStore.



6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/event_sourcery/postgres/event_store.rb', line 6

def initialize(db_connection,
               events_table_name: EventSourcery::Postgres.config.events_table_name,
               lock_table: EventSourcery::Postgres.config.lock_table_to_guarantee_linear_sequence_id_growth,
               write_events_function_name: EventSourcery::Postgres.config.write_events_function_name,
               event_builder: EventSourcery.config.event_builder,
               on_events_recorded: EventSourcery::Postgres.config.on_events_recorded)
  @db_connection = db_connection
  @events_table_name = events_table_name
  @write_events_function_name = write_events_function_name
  @lock_table = lock_table
  @event_builder = event_builder
  @on_events_recorded = on_events_recorded
end

Instance Method Details

#get_events_for_aggregate_id(aggregate_id) ⇒ Array

Get the events for a given aggregate id.

Parameters:

  • aggregate_id

    the aggregate id to filter for

Returns:

  • (Array)

    of found events



87
88
89
90
91
# File 'lib/event_sourcery/postgres/event_store.rb', line 87

def get_events_for_aggregate_id(aggregate_id)
  events_table.where(aggregate_id: aggregate_id.to_str).order(:version).map do |event_hash|
    build_event(event_hash)
  end
end

#get_next_from(id, event_types: nil, limit: 1000) ⇒ Array

Get the next set of events from the given event id. You can specify event types and a limit. Default limit is 1000 and the default event types will be all.

Parameters:

  • id

    the event id to get next events from

  • event_types (defaults to: nil)

    the event types to filter, default nil = all

  • limit (defaults to: 1000)

    the limit to the results, default 1000

Returns:

  • (Array)

    array of found events



57
58
59
60
61
62
63
64
# File 'lib/event_sourcery/postgres/event_store.rb', line 57

def get_next_from(id, event_types: nil, limit: 1000)
  query = events_table.
    order(:id).
    where(Sequel.lit('id >= ?', id)).
    limit(limit)
  query = query.where(type: event_types) if event_types
  query.map { |event_row| build_event(event_row) }
end

#latest_event_id(event_types: nil) ⇒ Object

Get last event id for a given event types.

Parameters:

  • event_types (defaults to: nil)

    the type of event(s) to filter

Returns:

  • the latest event id



71
72
73
74
75
76
77
78
79
80
# File 'lib/event_sourcery/postgres/event_store.rb', line 71

def latest_event_id(event_types: nil)
  latest_event = events_table
  latest_event = latest_event.where(type: event_types) if event_types
  latest_event = latest_event.order(:id).last
  if latest_event
    latest_event[:id]
  else
    0
  end
end

#sink(event_or_events, expected_version: nil) ⇒ Object

Like water flowing into a sink eventually it will go down the drain into the goodness of the plumbing system. So to will the given events you put in this ‘sink’. Except the plumbing system is the data base events table. This can raise db connection errors.

Parameters:

  • event_or_events

    the event or events to save

  • expected_version (defaults to: nil)

    the version to save with the event, default nil

Raises:

  • (DatabaseError)

    if something goes wrong with the database

  • (ConcurrencyError)

    if there was a concurrency conflict



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/event_sourcery/postgres/event_store.rb', line 31

def sink(event_or_events, expected_version: nil)
  events = Array(event_or_events)
  aggregate_ids = events.map(&:aggregate_id).uniq
  raise AtomicWriteToMultipleAggregatesNotSupported unless aggregate_ids.count == 1
  sql = write_events_sql(aggregate_ids.first, events, expected_version)
  @db_connection.run(sql)
  log_events_saved(events)
  on_events_recorded.call(events)
  true
rescue Sequel::DatabaseError => e
  if e.message =~ /Concurrency conflict/
    raise ConcurrencyError, "expected version was not #{expected_version}. Error: #{e.message}"
  else
    raise
  end
end

#subscribe(from_id:, event_types: nil, after_listen: nil, subscription_master:, &block) ⇒ Object

Subscribe to events.

Parameters:

  • from_id

    subscribe from a starting event id. default will be from the start.

  • event_types (defaults to: nil)

    the event_types to subscribe to, default all.

  • after_listen (defaults to: nil)

    the after listen call back block. default nil.

  • subscription_master

    the subscription master block



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/event_sourcery/postgres/event_store.rb', line 99

def subscribe(from_id:, event_types: nil, after_listen: nil, subscription_master:, &block)
  poll_waiter = OptimisedEventPollWaiter.new(db_connection: @db_connection, after_listen: after_listen)
  args = {
    poll_waiter: poll_waiter,
    event_store: self,
    from_event_id: from_id,
    event_types: event_types,
    events_table_name: @events_table_name,
    subscription_master: subscription_master,
    on_new_events: block
  }
  EventSourcery::EventStore::Subscription.new(**args).tap(&:start)
end