Class: Messaging::Adapters::Postgres::Store

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging/adapters/postgres/store.rb

Overview

Message store adapter using Postgres and ActiveRecord. Prefer accessing the message store through Messaging.message_store instead of using it directly.

Examples:

Using Postgres as the default message store adapter:

# Put this in an initializer
Messaging.setup do |config|
  config.message_store.adapter = :postgres
end

See Also:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeStore

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Should not be used directly. Access the store though Messaging.message_store or Messaging::Adapters::Store



30
31
32
# File 'lib/messaging/adapters/postgres/store.rb', line 30

def initialize
  @streams = Streams.new
end

Instance Attribute Details

#streamsStreams (readonly)

Returns all the streams in the store.

Returns:

  • (Streams)

    all the streams in the store

See Also:



25
26
27
# File 'lib/messaging/adapters/postgres/store.rb', line 25

def streams
  @streams
end

Instance Method Details

#call(message) ⇒ Messaging::Message

Writes the message to Postgres Skips messages that hasn’t defined a stream name We do this to begin with so PG is opt-in per message

Parameters:

Returns:

  • (Messaging::Message)

    A new copy of the message with the stream position added (if persisted)



84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/messaging/adapters/postgres/store.rb', line 84

def call(message)
  return message unless message.stream_name

  SerializedMessage.create!(message: message).to_message
rescue ActiveRecord::StatementInvalid => e
  category = message.category
  raise e unless e.message.include?('no partition of relation')
  raise e unless category || category.is_a?(CategoryWithPartitions)

  category.add_partition(date: Date.today)
  retry
end

#categoriesCategories

Returns all the stream categories in the store.

Returns:

  • (Categories)

    all the stream categories in the store

See Also:



43
44
45
# File 'lib/messaging/adapters/postgres/store.rb', line 43

def categories
  Categories.new
end

#category(name) ⇒ Category

Get a specific category by name

Returns:

See Also:



50
51
52
# File 'lib/messaging/adapters/postgres/store.rb', line 50

def category(name)
  categories[name]
end

#messagesSerializedMessage

Access to all messages. Use with caution in production as there are probably a lot of messages so queries could take a long time or timeout.

Examples:

Check that a message has been added to the store with Rspec

expect do
  # Your code that should add a message to the store
end.to change { Messaging.message_store.messages.count }.from(0).to(1)

Returns:



64
65
66
# File 'lib/messaging/adapters/postgres/store.rb', line 64

def messages
  SerializedMessage
end

#messages_in_streams(*streams) ⇒ ActiveRecord::Relation

Access to all messages in the given streams

Parameters:

  • streams (Array<String, Stream>)

    List of one or more streams to get messages from

Returns:

  • (ActiveRecord::Relation)

See Also:



73
74
75
# File 'lib/messaging/adapters/postgres/store.rb', line 73

def messages_in_streams(*streams)
  SerializedMessage.where(stream: streams.flatten.map(&:to_s)).order(:id)
end

#stream(name) ⇒ Stream

Get a specific stream by name

Returns:

See Also:



37
38
39
# File 'lib/messaging/adapters/postgres/store.rb', line 37

def stream(name)
  streams[name]
end