Class: Messaging::Adapters::Postgres::SerializedMessage
- Inherits:
-
ActiveRecord::Base
- Object
- ActiveRecord::Base
- Messaging::Adapters::Postgres::SerializedMessage
- Defined in:
- lib/messaging/adapters/postgres/serialized_message.rb
Instance Attribute Summary collapse
-
#expected_version ⇒ Object
Returns the value of attribute expected_version.
Class Method Summary collapse
- .in_category(categories) ⇒ Object
-
.instantiate(attributes, *_args) ⇒ Object
We override this AR method to make records retreived from scopes etc.
-
.instantiate_instance_of(klas, attributes, column_types = {}, &block) ⇒ Object
Rails version >= 6.
- .newer_than(transaction_id, position) ⇒ Object
- .with_transaction_id_lower_than_any_currently_running_transaction ⇒ Object
Instance Method Summary collapse
- #create_or_update(*args) ⇒ Object
-
#message=(message) ⇒ Object
Virtual setter for message so we can create a serialized message from a message.
-
#readonly? ⇒ Boolean
You should never update a message after creating it.
- #stream ⇒ Object
- #to_message ⇒ Object
Instance Attribute Details
#expected_version ⇒ Object
Returns the value of attribute expected_version.
7 8 9 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 7 def expected_version @expected_version end |
Class Method Details
.in_category(categories) ⇒ Object
9 10 11 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 9 def self.in_category(categories) where(stream_category: Array(categories).map(&:to_s)) end |
.instantiate(attributes, *_args) ⇒ Object
We override this AR method to make records retreived from scopes etc. be message objects of the corresponding message class instead of AR objects
See api.rubyonrails.org/classes/ActiveRecord/Persistence/ClassMethods.html#method-i-instantiate Rails version < 6
31 32 33 34 35 36 37 38 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 31 def self.instantiate(attributes, *_args) attributes['message_type'].constantize.new( JSON.parse(attributes['data']).merge( stream_position: attributes['stream_position'], transaction_id: attributes['transaction_id'], global_position: attributes['id']) ) end |
.instantiate_instance_of(klas, attributes, column_types = {}, &block) ⇒ Object
Rails version >= 6
41 42 43 44 45 46 47 48 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 41 def self.instantiate_instance_of(klas, attributes, column_types = {}, &block) attributes['message_type'].constantize.new( JSON.parse(attributes['data']).merge( stream_position: attributes['stream_position'], transaction_id: attributes['transaction_id'], global_position: attributes['id']) ) end |
.newer_than(transaction_id, position) ⇒ Object
17 18 19 20 21 22 23 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 17 def self.newer_than(transaction_id, position) where([ 'transaction_id = (:transaction_id)::xid8 AND id > :position OR transaction_id > (:transaction_id)::xid8', transaction_id: transaction_id.to_s, position: position.to_i ]) end |
.with_transaction_id_lower_than_any_currently_running_transaction ⇒ Object
13 14 15 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 13 def self.with_transaction_id_lower_than_any_currently_running_transaction where('transaction_id < pg_snapshot_xmin(pg_current_snapshot())') end |
Instance Method Details
#create_or_update(*args) ⇒ Object
74 75 76 77 78 79 80 81 82 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 74 def create_or_update(*args) return super if readonly? with_locked_stream do set_stream_position set_transaction_id validate_expected_version! super end end |
#message=(message) ⇒ Object
Virtual setter for message so we can create a serialized message from a message
51 52 53 54 55 56 57 58 59 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 51 def () self.data = .attributes_as_json self.expected_version = .expected_version self. = . self.stream = .stream_name self.stream_category = .stream_category self.stream_id = .stream_id self.uuid = .uuid end |
#readonly? ⇒ Boolean
You should never update a message after creating it
66 67 68 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 66 def readonly? true unless new_record? end |
#stream ⇒ Object
70 71 72 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 70 def stream @stream ||= Stream.new(attributes['stream']) end |
#to_message ⇒ Object
61 62 63 |
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 61 def .constantize.new(data.merge(stream_position: stream_position)) end |