Class: Messaging::Adapters::Postgres::Store
- Inherits:
-
Object
- Object
- Messaging::Adapters::Postgres::Store
- Defined in:
- lib/messaging/adapters/postgres/store.rb
Overview
Message store adapter using Postgres and ActiveRecord. Prefer accessing the message store through Messaging.message_store instead of using it directly.
Instance Attribute Summary collapse
-
#streams ⇒ Streams
readonly
All the streams in the store.
Instance Method Summary collapse
-
#call(message) ⇒ Messaging::Message
Writes the message to Postgres Skips messages that hasn’t defined a stream name We do this to begin with so PG is opt-in per message.
-
#categories ⇒ Categories
All the stream categories in the store.
-
#category(name) ⇒ Category
Get a specific category by name.
-
#initialize ⇒ Store
constructor
private
Should not be used directly.
-
#messages ⇒ SerializedMessage
Access to all messages.
-
#messages_in_streams(*streams) ⇒ ActiveRecord::Relation
Access to all messages in the given streams.
-
#stream(name) ⇒ Stream
Get a specific stream by name.
Constructor Details
#initialize ⇒ Store
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Should not be used directly. Access the store though Messaging.message_store or Messaging::Adapters::Store
30 31 32 |
# File 'lib/messaging/adapters/postgres/store.rb', line 30 def initialize @streams = Streams.new end |
Instance Attribute Details
#streams ⇒ Streams (readonly)
Returns all the streams in the store.
25 26 27 |
# File 'lib/messaging/adapters/postgres/store.rb', line 25 def streams @streams end |
Instance Method Details
#call(message) ⇒ Messaging::Message
Writes the message to Postgres Skips messages that hasn’t defined a stream name We do this to begin with so PG is opt-in per message
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/messaging/adapters/postgres/store.rb', line 84 def call() return unless .stream_name SerializedMessage.create!(message: ). rescue ActiveRecord::StatementInvalid => e category = .category raise e unless e..include?('no partition of relation') raise e unless category || category.is_a?(CategoryWithPartitions) category.add_partition(date: Date.today) retry end |
#categories ⇒ Categories
Returns all the stream categories in the store.
43 44 45 |
# File 'lib/messaging/adapters/postgres/store.rb', line 43 def categories Categories.new end |
#category(name) ⇒ Category
Get a specific category by name
50 51 52 |
# File 'lib/messaging/adapters/postgres/store.rb', line 50 def category(name) categories[name] end |
#messages ⇒ SerializedMessage
Access to all messages. Use with caution in production as there are probably a lot of messages so queries could take a long time or timeout.
64 65 66 |
# File 'lib/messaging/adapters/postgres/store.rb', line 64 def SerializedMessage end |
#messages_in_streams(*streams) ⇒ ActiveRecord::Relation
Access to all messages in the given streams
73 74 75 |
# File 'lib/messaging/adapters/postgres/store.rb', line 73 def (*streams) SerializedMessage.where(stream: streams.flatten.map(&:to_s)).order(:id) end |
#stream(name) ⇒ Stream
Get a specific stream by name
37 38 39 |
# File 'lib/messaging/adapters/postgres/store.rb', line 37 def stream(name) streams[name] end |