Class: Messaging::Adapters::Postgres

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging/adapters/postgres.rb,
lib/messaging/adapters/postgres/store.rb,
lib/messaging/adapters/postgres/stream.rb,
lib/messaging/adapters/postgres/streams.rb,
lib/messaging/adapters/postgres/category.rb,
lib/messaging/adapters/postgres/consumer.rb,
lib/messaging/adapters/postgres/categories.rb,
lib/messaging/adapters/postgres/create_lock.rb,
lib/messaging/adapters/postgres/release_lock.rb,
lib/messaging/adapters/postgres/categories/row.rb,
lib/messaging/adapters/postgres/serialized_message.rb,
lib/messaging/adapters/postgres/category_with_partitions.rb,
lib/messaging/adapters/postgres/advisory_transaction_lock.rb

Overview

Adapter for using Postgres and Active Record as a message store. capabilities provided by this adapter.

Defined Under Namespace

Classes: AdvisoryTransactionLock, Categories, Category, CategoryWithPartitions, Consumer, CreateLock, ReleaseLock, SerializedMessage, Store, Stream, Streams

Instance Method Summary collapse

Instance Method Details

#create_consumer(name, **options) ⇒ Object



28
29
30
31
# File 'lib/messaging/adapters/postgres.rb', line 28

def create_consumer(name, **options)
  Consumer.where(app: Messaging.config.app_name, name: name.to_s).first ||
    Consumer.create(app: Messaging.config.app_name, name: name.to_s)
end

#create_messages_tableObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/messaging/adapters/postgres.rb', line 33

def create_messages_table
  sql = <<~SQL
    CREATE SCHEMA IF NOT EXISTS messaging;
    CREATE SEQUENCE IF NOT EXISTS messaging.messages_id_seq;

    CREATE TABLE messaging.messages (
      id bigint DEFAULT nextval('messaging.messages_id_seq'::regclass) NOT NULL,
      transaction_id xid8 DEFAULT pg_current_xact_id() NOT NULL,
      uuid uuid NOT NULL,
      stream character varying NOT NULL,
      stream_position bigint NOT NULL,
      message_type character varying NOT NULL,
      data jsonb,
      created_at timestamp without time zone NOT NULL,
      updated_at timestamp without time zone NOT NULL,
      stream_category character varying,
      stream_id character varying
    )
    PARTITION BY LIST (stream_category);

    CREATE INDEX messages_id_idx ON ONLY messaging.messages USING btree (id);
    CREATE INDEX messages_stream_category_id_idx ON ONLY messaging.messages USING btree (stream_category, id);
    CREATE INDEX messages_stream_category_stream_id_stream_position_idx ON ONLY messaging.messages USING btree (stream_category, stream_id, stream_position);
  SQL
  connection.execute sql
end