Class: Messaging::Adapters::Postgres::SerializedMessage

Inherits:
ActiveRecord::Base
  • Object
show all
Defined in:
lib/messaging/adapters/postgres/serialized_message.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#expected_versionObject

Returns the value of attribute expected_version.



7
8
9
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 7

def expected_version
  @expected_version
end

Class Method Details

.in_category(categories) ⇒ Object



9
10
11
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 9

def self.in_category(categories)
  where(stream_category: Array(categories).map(&:to_s))
end

.instantiate(attributes, *_args) ⇒ Object

We override this AR method to make records retreived from scopes etc. be message objects of the corresponding message class instead of AR objects

See api.rubyonrails.org/classes/ActiveRecord/Persistence/ClassMethods.html#method-i-instantiate Rails version < 6



31
32
33
34
35
36
37
38
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 31

def self.instantiate(attributes, *_args)
  attributes['message_type'].constantize.new(
    JSON.parse(attributes['data']).merge(
      stream_position: attributes['stream_position'],
      transaction_id: attributes['transaction_id'],
      global_position: attributes['id'])
  )
end

.instantiate_instance_of(klas, attributes, column_types = {}, &block) ⇒ Object

Rails version >= 6



41
42
43
44
45
46
47
48
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 41

def self.instantiate_instance_of(klas, attributes, column_types = {}, &block)
  attributes['message_type'].constantize.new(
    JSON.parse(attributes['data']).merge(
      stream_position: attributes['stream_position'],
      transaction_id: attributes['transaction_id'],
      global_position: attributes['id'])
  )
end

.newer_than(transaction_id, position) ⇒ Object



17
18
19
20
21
22
23
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 17

def self.newer_than(transaction_id, position)
  where([
    'transaction_id = (:transaction_id)::xid8 AND id > :position OR transaction_id > (:transaction_id)::xid8',
    transaction_id: transaction_id.to_s,
    position: position.to_i
  ])
end

.with_transaction_id_lower_than_any_currently_running_transactionObject



13
14
15
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 13

def self.with_transaction_id_lower_than_any_currently_running_transaction
  where('transaction_id < pg_snapshot_xmin(pg_current_snapshot())')
end

Instance Method Details

#create_or_update(*args) ⇒ Object



74
75
76
77
78
79
80
81
82
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 74

def create_or_update(*args)
  return super if readonly?
  with_locked_stream do
    set_stream_position
    set_transaction_id
    validate_expected_version!
    super
  end
end

#message=(message) ⇒ Object

Virtual setter for message so we can create a serialized message from a message



51
52
53
54
55
56
57
58
59
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 51

def message=(message)
  self.data = message.attributes_as_json
  self.expected_version = message.expected_version
  self.message_type = message.message_type
  self.stream = message.stream_name
  self.stream_category = message.stream_category
  self.stream_id = message.stream_id
  self.uuid = message.uuid
end

#readonly?Boolean

You should never update a message after creating it

Returns:

  • (Boolean)


66
67
68
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 66

def readonly?
  true unless new_record?
end

#streamObject



70
71
72
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 70

def stream
  @stream ||= Stream.new(attributes['stream'])
end

#to_messageObject



61
62
63
# File 'lib/messaging/adapters/postgres/serialized_message.rb', line 61

def to_message
  message_type.constantize.new(data.merge(stream_position: stream_position))
end